目录
频道首页
多线程设计模式
收藏
0
Rocky-BCRJ 最近修改于 2023-05-12 00:14:24

[toc]

前言

圈主 [Rocky编程日记] 学习多线程的设计模式笔记记录。希望我写得笔记你能够喜欢, 希望我写的笔记能够给你提供帮助。同时若笔记中存在不对的地方,那一定是圈主当时的理解还不够, 希望你能够及时指出嗷~

目前正在整理ing, 还请别急~

代码仓库地址:

code-multithread-pattern

需要引用 如下 pom文件依赖

 &nbsp; &nbsp; &nbsp; &nbsp;<dependency>
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;<groupId>org.slf4j</groupId>
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;<artifactId>slf4j-log4j12</artifactId>
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;<version>1.7.21</version>
 &nbsp; &nbsp; &nbsp; &nbsp;</dependency>

多线程程序的评价标准

  • 安全性 ---- 不损坏对象

    • 不损坏对象。 程序正常运行的必要条件之一。对象损坏是指对象的状态和设计者的原意不一致,通常是指对象的字段的值并非预期值。

    • 判断一个类是否是线程安全的: ** 一个类即使被多个线程使用,也可以确保安全性, 那么这个类就是线程安全的。**

    • 什么是线程兼容 [Bloch] ? ArrayList 虽然是非线程安全的, 但通过执行适当的互斥处理, 也可以安全地使用。

    • 某个线程是线程安全的还是非线程安全的, 与这个类的方法是否是 synchronized 方法无关。

  • 生存性 ---- 必要的时候能够被执行

    • 生存性是指无论是什么时候, 必要的处理都一定能够被执行。

    • 即使对象没有损坏, 也不一定代表程序就一定好。极端一点说,假如程序在运行过程中突然停止, 这时, 由于处理已经停止, 对象的状态就不会发生改变, 所以对象状态也就不会异常。虽然符合 "安全性" , 但无法运行的程序根本没有意义。

    • 有时候生存性 和 安全性 会相互制约。例如, 有时候只重视安全性, 生存性就会下降。例如 死锁(deadlock), 多个线程互相等待对方释放锁的情形。

  • 可复用性 ---- 类可重复利用

    • 指类能够重复利用。

    • 类如果能够作为组件从正常运行的软件中分割出来,那么就说明这个类有很高的可复用性。

    • J2 SE 5.0 中引入的 java.util.concurrent包中提供了便于多线程编程的可复用性高的类。

  • 性能 ---- 能快速、大批量地执行处理

    • 指能快速、大批量地执行处理。

    • 影响性能可能因素

      • 吞吐量 : 单位时间内完成的处理数量。能完成的处理越多, 则表示吞吐量越大。

      • 响应性 : 从发出请求到收到响应的时间。时间越短, 响应性也就越好。举例: 相比于按下按钮后无任何反应, 10秒后才提示 "处理完毕" 这种方式, 在按下按钮时立刻提示 "处理开始" 这种方式的响应性更高, 即使到处理结束花费的时间稍多一点也没关系。响应性好也称为 "等待时间" 短。

      • 容量 : 指可同时进行的处理数量。

      • 效率、可伸缩性、降级等

Single Threaded Execution 模式

Single Threaded Execution 模式简介

意思是 "以一个线程执行 "。类似于独木桥同一时间内只允许一个人通行一样, 该模式用于设置限制, 以确保同一时间内只能让一个线程执行处理。

该模式有时候也被称为 "临界区""临界域" 这个名称侧重于执行处理的线程(过桥的人), 而临界区域临界域的名称则侧重于执行范围(人过的桥)

案例简介: 模拟三位通行者频繁通过一个一次只允许一个人经过的门的情形。当通行者通过门的时候, 统计人数便会递增。并且程序还会记录通行者的 "姓名与出生地"。

通行者

public class User {
​
 &nbsp; &nbsp;private String name;
​
 &nbsp; &nbsp;private String address;
​
 &nbsp; &nbsp;public User(String name, String address) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.name = name;
 &nbsp; &nbsp; &nbsp; &nbsp;this.address = address;
 &nbsp;  }
​
 &nbsp; &nbsp;public String getName() {
 &nbsp; &nbsp; &nbsp; &nbsp;return name;
 &nbsp;  }
​
 &nbsp; &nbsp;public void setName(String name) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.name = name;
 &nbsp;  }
​
 &nbsp; &nbsp;public String getAddress() {
 &nbsp; &nbsp; &nbsp; &nbsp;return address;
 &nbsp;  }
​
 &nbsp; &nbsp;public void setAddress(String address) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.address = address;
 &nbsp;  }
}
​

用户任务

/**
 * 用户任务
 */
public class UserThread extends Thread {
​
 &nbsp; &nbsp;private Logger log = LoggerFactory.getLogger(this.getClass());
​
 &nbsp; &nbsp;/**
 &nbsp; &nbsp; * final 与 创建线程安全的实例
 &nbsp; &nbsp; */
 &nbsp; &nbsp;private final SafeGate safeGate;
​
 &nbsp; &nbsp;private final User user;
​
 &nbsp; &nbsp;public UserThread(SafeGate safeGate, User user) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.safeGate = safeGate;
 &nbsp; &nbsp; &nbsp; &nbsp;this.user = user;
 &nbsp;  }
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;log.info("线程: {} 姓名:{} 经过安全门",Thread.currentThread().getName(),user.getName());
 &nbsp; &nbsp; &nbsp; &nbsp;while (true) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;safeGate.pass(user);
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}
​

安全门

public class SafeGate {
​
 &nbsp; &nbsp;private &nbsp;int number;
​
 &nbsp; &nbsp;private String name;
​
 &nbsp; &nbsp;private String address;
​
 &nbsp; &nbsp;public void pass(User user) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.number++;
 &nbsp; &nbsp; &nbsp; &nbsp;this.name = user.getName();
 &nbsp; &nbsp; &nbsp; &nbsp;// 休眠
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(1000);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;this.address = user.getAddress();
 &nbsp; &nbsp; &nbsp; &nbsp;check();
 &nbsp;  }
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public String toString() {
 &nbsp; &nbsp; &nbsp; &nbsp;return "No." + number &nbsp;+ ": "+ name + " : " + address;
 &nbsp;  }
​
 &nbsp; &nbsp;private void check() {
 &nbsp; &nbsp; &nbsp; &nbsp;if (name.charAt(0) != address.charAt(0)) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("=== BROKEN ===" + toString());
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

测试

public class SafeGateTest {
​
 &nbsp; 
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;SafeGate safeGate = new SafeGate();
 &nbsp; &nbsp; &nbsp; &nbsp;new UserThread(safeGate,new User("rocky", "ro_Zhou")).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new UserThread(safeGate,new User("luo","mei_Zhou")).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new UserThread(safeGate,new User("chen","chen_Zhou")).start();
 &nbsp;  }
}

结果

 === BROKEN ===No.3: rocky : mei_Zhou
 === BROKEN ===No.3: rocky : mei_Zhou
 === BROKEN ===No.6: chen : ro_Zhou
 === BROKEN ===No.6: chen : ro_Zhou
 === BROKEN ===No.6: chen : ro_Zhou
 === BROKEN ===No.10: rocky : chen_Zhou
 === BROKEN ===No.10: rocky : chen_Zhou
 === BROKEN ===No.12: chen : mei_Zhou
 === BROKEN ===No.12: chen : mei_Zhou
 === BROKEN ===No.15: luo : mei_Zhou
 === BROKEN ===No.16: luo : ro_Zhou
 === BROKEN ===No.17: rocky : chen_Zhou

休眠了一段时间 , 控制台日志显示与预期结果不一致。不知道为啥会出现原因的可以 搜一搜 缓存一致性协议(MESI)

改造使之安全

/**
 * 安全门
 */
public class SafeGate {
​
 &nbsp; &nbsp;private &nbsp;int number;
​
 &nbsp; &nbsp;private String name;
​
 &nbsp; &nbsp;private String address;
​
 &nbsp; &nbsp;public synchronized void pass(User user) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.number++;
 &nbsp; &nbsp; &nbsp; &nbsp;this.name = user.getName();
 &nbsp; &nbsp; &nbsp; &nbsp;// 休眠
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(1000);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;this.address = user.getAddress();
 &nbsp; &nbsp; &nbsp; &nbsp;check();
 &nbsp;  }
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public synchronized String toString() {
 &nbsp; &nbsp; &nbsp; &nbsp;return "No." + number &nbsp;+ ": "+ name + " : " + address;
 &nbsp;  }
​
 &nbsp; &nbsp;private void check() {
 &nbsp; &nbsp; &nbsp; &nbsp;if (name.charAt(0) != address.charAt(0)) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("=== BROKEN ===" + toString());
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

Single Threaded Execution 模式中的登场角色

SharedResource(共享资源)

Single Threaded Execution 模式中出现了一个发挥 SharedResource(共享资源) 作用的类。由 SafeGate扮演 SharedResource 角色。

SharedResource 角色是可被多个线程访问的类, 包含很多方法, 但这些方法主要分为如下两类。

  • safeMethod: 多个线程同时调用也不会发生问题的方法

  • unsafeMethod:多个线程同时会调用会发生问题, 因此必须加以保护的方法。

Single Threaded Execution模式会保护 unsafeMethod, 使其同时只能由一个线程访问。Java则是通过 unsafeMethod声明 为 synchronzied方法来保护。我们将只允许单个线程执行的程序范围称为临界区。

在什么情况下使用 Single Threaded Execution 呢 ?

  • 多线程时

    • 单线程程序中并不需要使用 Single Threaded Execution模式, 因此也就无需使用 synchronzied 方法。当然, 在 synchronzied 方法并不会破坏程序的安全性。但 synchronzied 方法要比调用一般方法花费时间, 这会稍微降低程序性能。
  • 多个线程访问时

  • 状态有可能发生变化

    • 之所以需要使用 Single Threaded Execution 模式, 是因为 ShareResource 角色的状态会变化。

    • 如果在创建实例后, 实例的状态再也不会发生变化, 那就无需使用 Single Threaded Execution 模式。

  • 需要确保安全性时

    • 只有在需要确保安全性时, 才需要使用 Single Threaded Execution 模式

    • 例如, Java的集合类大多都是非线程安全的。这是为了在不需要考虑安全性的时候提高程序运行速度。

    • 用户在使用类时, 需要考虑自己要用的类是否是线程安全的。

生存性与死锁

Single Thread Execution模式中, 满足下列条件时, 死锁就会发生。

  • 存在多个 SharedResource 角色的锁的同时, 还想获取其他 SharedResource角色的锁

  • 获取SharedResource角色的锁的顺序并不固定(SharedResource角色是对称的)

可复用性和继承反常

假设我们现在要编写 一个 SharedResource 角色的子类。如果子类能够访问 SharedResource角色的字段, 那么编写子类的开发人员就可能会不小心编写出无保护的 unsafeMethod。即使能够确保好不容易编写的 SharedResource角色的安全性, 在子类化时还是有可能会失去安全性。如果不将包含子类在内的所有 unsafeMethod 都声名为 synchronized方法, 就无法确定 SharedResource 角色的安全性。

在 面向对象的程序设计中, 伴随子类化而出现的 "继承" 起着非常重要的作用。但对于多线程程序设计来说, 继承会引起一些麻烦的问题。我们通常称之为 继承反常。

临界区的大小和性能

Single Threaded Execution 模式会降低程序性能, 原因有如下两个方面。

  • 获取锁花费时间 : 进入 synchronized 方法时, 线程需要获取对象的锁, 该处理会花费时间。

    如果 SharedResource角色的数量减少了, 那么要获取的锁的数量也会相应地减少, 从而就能够抑制性能的下降了。

  • 线程冲突引起的等待 : 当线程执行临界区内的处理时, 其他想要进入临界区的线程会阻塞。这种状况称为线程冲突。发生冲突时, 程序的整体性能会随着线程等待时间的增加而下降。

    如果尽可能地缩小临界区的范围, 降低线程冲突的概率, 那么就能够抑制性能的下降

Immutable Object(不可变对象) 模式

Immutable Object模式简介

Java.lang.String 类是用于表示字符串, String 类中并没有修改字符串内容的方法。也就是说, String 的实例所表示的字符串内容绝对不会发生变化。

正因为如此, String类中的方法无需声明为 synchronized。 因为实例的内部状态不会发生改变, 所以无论 String 实例被多少个线程访问,也无需执行线程的互斥处理。

Immutable就是不变的、不发生改变。

Immutable模式中存在着确保实例状态不发生改变的类。在访问这些实例时不需要执行耗时的互斥处理。如果能用好该模式,就可以提高程序性能。

如String就是一个不可变(immutable)类。

示例代码

User类

public class User {
​
 &nbsp; &nbsp;private final String name;
​
 &nbsp; &nbsp;private final String address;
​
 &nbsp; &nbsp;public User(String name, String address) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.name = name;
 &nbsp; &nbsp; &nbsp; &nbsp;this.address = address;
 &nbsp;  }
​
​
​
 &nbsp; &nbsp;@Override
 &nbsp; &nbsp;public String toString() {
 &nbsp; &nbsp; &nbsp; &nbsp;return "User{" +
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;"name='" + name + '\'' +
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;", address='" + address + '\'' +
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;'}';
 &nbsp;  }
​
 &nbsp; &nbsp;public String getName() {
 &nbsp; &nbsp; &nbsp; &nbsp;return name;
 &nbsp;  }
​
 &nbsp; &nbsp;public String getAddress() {
 &nbsp; &nbsp; &nbsp; &nbsp;return address;
 &nbsp;  }
}

PrintPersonThread

public class PrintPersonThread extends Thread {
 &nbsp; &nbsp;private User user;
​
 &nbsp; &nbsp;public PrintPersonThread(User user) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.user = user;
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;while (true) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " prints " + user);
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

PrintPersonThreadTest

public class PrintPersonThreadTest {
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;User alice = new User("Rocky编程日记", "Rocky编程日记");
 &nbsp; &nbsp; &nbsp; &nbsp;new PrintPersonThread(alice).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new PrintPersonThread(alice).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new PrintPersonThread(alice).start();
 &nbsp;  }
}

现在回想一下 Single Threaded Execution模式。该模式会将修改或引用实例状态的地方设置为临界区, 使这个区域只能由一个线程同时执行。但像 User类这样, 实例的状态绝对不会发生改变时, 情况就不一样啦。即使多个线程同时对该实例执行处理, 实例也不会出错, 因为实例的状态肯定不会发生改变。既然实例的状态肯定不会发生改变,那么也就无须使用 synchronized 来保护实例。因为即使想破坏实例, 也破坏不了。

Immutable 模式中的登场角色

  • Immutable(不可变的)

    • Immutable 角色是一个类, 在这个角色中,字段的值不可以修改, 也不存在修改字段内容的方法。Immutable 角色实例被创建后, 状态就不再发生变化。此时, 无需对Immutable角色应用 Single Threaded Execution模式。

在什么场景下使用Immutable模式

  • 实例创建后, 状态不再发生变化时

    • 首先, 如前面反复讲述的那样, "实例创建后, 状态不再发生变化"是必要条件。

    • 实例的状态是由字段的值决定的,所以 "将字段声明为final字段, 且不存在setter方法"是重点所在

    • 即使字段是final 字段, 且不存在 setter 方法, 也有可能不是不可变的(immutable)。因为即使字段的值不会发生变化, 字段引用的实例也有可能会发生变化。

  • 实例是共享的,且被频繁访问时

    • Immutable模式的优点是 "不需要使用 synchronized 进行保护"。

    • 不需要使用 synchronized 进行保护就意味着能够在不失去安全性和生存性的前提下提高性能。

    • 当实例被多个线程共享, 且有可能被频繁访问时, Immutable 模式的优点就会凸显出来。

考虑成对的 mutable 类 和 immutable 类 [性能]

  • 思考: 假设存在于一个类, 由于该类会被多个线程访问, 所以我们使用 synchronized 进行了保护。这里, 如果该类中存在 setting 方法, 那么 Immutable模式就不成立啦。

  • 假设 我们查看程序后发现实际上这个 setter 方法并未被使用, 那么就可以将字段声明为 final, 删除setter 方法, 并注意遵守不可变性,这样或许就能将其改造为可适用于 Immutable 模式了。

  • 实际上可能使用了 setter 方法。这时候, 该类就不适用于 Immutable 模式了, 但这还不是放弃的时候, 我们来仔细查看了一下整个程序是如何使用该类的, 看是不是可以分为使用 setter 方法的情况下与不使用 setter 方法的情况。如果可以明确分为这两种情况, 那我们是不是可以将这个类拆分为 mutable 类 和 immutable 类, 然后再设计成可以根据 mutable 实例创建 immutable 实例, 并可以反过来根据 immutable 实例创建 mutable 实例呢? 这样, immutable 类的部分就可以应用 Immutable 模式了。

  • Java 的标准类库中就有成对的 mutable 类 和 immutable 类, 例如:

    java.lang.StringBuffer类和java.lang.String类。StringBuffer类是表示字符串的 mutable 类。表示的字符串能够随便改写, 为了确保安全, 改写时需要妥善使用 synchronized。而 String 类是表示字符串的 immutable 类。String 实例表示的字符串不可以改写。

    StringBuffer类中有一个以 String 为参数的构造函数, 而 String类中有一个以 StringBuffer为参数的构造函数。也就是说, StringBuffer的实例和 String的实例可以互相转换。

为了确保不可变性(可复用性)

不可变性是一个很微妙的性质, 代码稍微一修改, 程序可能就会失去不可变性。如果从使用了 Immutable 模式的程序中删除了 synchronized, 那么当失去不可变性时,程序的安全性就会完全丧失, 所以一定要注意。

Guarded Suspension(保护性暂挂) 模式

Guarded Suspension 模式简介?

是“被保护着的”、“被防卫着的”意思,suspension则是“暂停”的意思。当现在并不适合马上执行某个操作时,就要求想要执行该操作的线程等待,这就是Guarded Suspension Pattern。 Guarded Suspension Pattern 会要求线程等候,以保障实例的安全性,其它类似的称呼还有guarded wait、spin lock等。

示例程序

Request

/**
 * 一个请求的类
 */
public class Request {
 &nbsp; &nbsp;private final String name;
​
 &nbsp; &nbsp;public Request(String name) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.name = name;
 &nbsp;  }
​
 &nbsp; &nbsp;public String getName() {
 &nbsp; &nbsp; &nbsp; &nbsp;return name;
 &nbsp;  }
​
 &nbsp; &nbsp;public String toString() {
 &nbsp; &nbsp; &nbsp; &nbsp;return "[ Request " + name + " ]";
 &nbsp;  }
}

RequestQueue

/**
 * 依次存放请求的类
 */
public class RequestQueue {
 &nbsp; &nbsp;private final Queue<Request> queue = new LinkedList<Request>();
​
 &nbsp; &nbsp;/**
 &nbsp; &nbsp; * 1. 施加守护条件进行保护
 &nbsp; &nbsp; * 2. 不等待和等待的情况
 &nbsp; &nbsp; * 3. 执行 wait, 等待条件发生变化 [明确线程在等待什么?应该何时执行notify/notifyAll]
 &nbsp; &nbsp; * 4. 执行到while的下一条语句时一定能确定的事情
 &nbsp; &nbsp; * @return
 &nbsp; &nbsp; */
 &nbsp; &nbsp;public synchronized Request getRequest() {
 &nbsp; &nbsp; &nbsp; &nbsp;while (queue.peek() == null) { // 守护条件的逻辑非运算
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + ": wait() begins, queue = " + queue);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wait();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + ": wait() ends, &nbsp; queue = " + queue);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;// != null 守护条件
 &nbsp; &nbsp; &nbsp; &nbsp;return queue.remove();
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized void putRequest(Request request) {
 &nbsp; &nbsp; &nbsp; &nbsp;queue.offer(request);
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + ": notifyAll() begins, queue = " + queue);
 &nbsp; &nbsp; &nbsp; &nbsp;notifyAll();
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + ": notifyAll() ends, queue = " + queue);
 &nbsp;  }
}
​

ClientThread

/**
 * 发送请求的类
 */
public class ClientThread extends Thread {
 &nbsp; &nbsp;private final Random random;
 &nbsp; &nbsp;private final RequestQueue requestQueue;
​
 &nbsp; &nbsp;public ClientThread(RequestQueue requestQueue, String name, long seed) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.requestQueue = requestQueue;
 &nbsp; &nbsp; &nbsp; &nbsp;this.random = new Random(seed);
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < 10000; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Request request = new Request("No." + i);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " requests " + request);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;requestQueue.putRequest(request);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(random.nextInt(1000));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

ServerThread

/**
 * 接收请求的类
 */
public class ServerThread extends Thread {
 &nbsp; &nbsp;private final Random random;
 &nbsp; &nbsp;private final RequestQueue requestQueue;
​
 &nbsp; &nbsp;public ServerThread(RequestQueue requestQueue, String name, long seed) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.requestQueue = requestQueue;
 &nbsp; &nbsp; &nbsp; &nbsp;this.random = new Random(seed);
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < 10000; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Request request = requestQueue.getRequest();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " handles  " + request);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(random.nextInt(1000));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

SuspensionTest

/**
 * 测试程序的类
 */
public class SuspensionTest {
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;RequestQueue requestQueue = new RequestQueue();
 &nbsp; &nbsp; &nbsp; &nbsp;new ClientThread(requestQueue, "Rocky", 3141592L).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new ServerThread(requestQueue, "Rocky_BCRJ", 6535897L).start();
 &nbsp;  }
}

Guarded Suspension 模式中的角色

  • GuardedObject(被守护的对象)

    GuardedObject角色是一个持有被守护的方法 (guardeMethod) 的类。当线程执行 guardeMethod方法时, 若守护条件成立, 则可以立即执行; 当守护条件不成立时, 就要进行等待。守护条件的成立与否会随着 GuardedObject角色的状态不同而发生变化。

    还有可能持有其他改变实例状态(特别是改变守护条件)的方法(stateChangingMethod)。

    在 Java 中 guardeMethod通过 while语句和wait方法来实现, stateChangingMethod则通过 notify/notifyAll 方法来实现

在什么场景下使用 Guarded Suspension

  • 附加条件的 synchronized

    • Single Threaded Execution 模式中, 只要有一个线程进入临界区, 其他线程就无法进入, 只能等待。

    • Guarded Suspension模式中, 线程是否等待取决于守护条件。Guarded Suspension模式是在 Single Threaded Execution模式的基础上附加了条件而形成的。也就是说, Guarded Suspension模式是类似于附加条件的 synchronized 这样的模式。

  • 多线程版本的 if

    • 单线程中, 执行操作的主体操作线程只有一个。如果该类唯一线程进入等待线程, 就没有线程来改变实例的状态。
  • 忘记改变状态与生存性

    • wait的线程每次被 notify / notifyAll 时都会检查守护条件。不管被 notify / notifyAll多少次, 如果守护条件不成立, 线程都会随着 while 再次 wait
  • waitnotify / notifyAll 的责任

    • 示例程序中, 你会发现 wait / notifyAll 只出现在 RequestQueue类中, 而并未出现在 ClientThread、ServerThread类中。Guarded Suspension模式的实现封装在 RequestQueue类中。

    • 这种将 wait / notifyAll 隐藏起来的做法对 RequestQueue类的复用性来说是非常重要的。其他类无需 考虑 wait / notifyAll的问题, 只需要调用就可以了。

  • 各种称呼

    • 特征: 存在循环, 存在条件检查, 因为某种原因而 " 等待"

    • guarded suspension: 被守护而暂停执行的含义

    • guarded wait : 被守护而等待, 其实现方法为线程使用 wait进行等待, 被notify / notifyAll 后, 再次检查条件是否成立。由于线程在使用 wait进行等待的期间是待在等待队列中停止执行的。所以并不会浪费虚拟机的处理时间。

    • busy wait : 忙于等待, 其实现方法并未使用 wait 进行等待, 而是执行 yield的同时检查守护条件。由于等待端的线程是在持续运行的。所以会浪费虚拟机的处理时间。

    • spin lock : 通过旋转来锁定, 在条件成立之前, 通过 while 循环 旋转等待的情形。

    • polling : 反复检查某个事件是否发生, 若发生, 则执行相应处理的方式 。

Baking 模式

Balking 模式 简介

Balking模式: 如果现在不适合执行这个操作, 或者没必要执行这个操作, 就停止处理, 直接返回。

示例程序

Data

/**
 * 可以修改并保存的数据的类
 */
public class Data {
 &nbsp; &nbsp;private final String filename; &nbsp;// 保存的文件名
 &nbsp; &nbsp;private String content; &nbsp; &nbsp; &nbsp; &nbsp; // 数据内容
 &nbsp; &nbsp;private boolean changed; &nbsp; &nbsp; &nbsp; &nbsp;// 修改后的内容若未保存,则为true
​
 &nbsp; &nbsp;public Data(String filename, String content) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.filename = filename;
 &nbsp; &nbsp; &nbsp; &nbsp;this.content = content;
 &nbsp; &nbsp; &nbsp; &nbsp;this.changed = true;
 &nbsp;  }
​
 &nbsp; &nbsp;// 修改数据内容
 &nbsp; &nbsp;public synchronized void change(String newContent) {
 &nbsp; &nbsp; &nbsp; &nbsp;content = newContent;
 &nbsp; &nbsp; &nbsp; &nbsp;changed = true;
 &nbsp;  }
​
 &nbsp; &nbsp;// 若数据内容修改过,则保存到文件中
 &nbsp; &nbsp;public synchronized void save() throws IOException {
 &nbsp; &nbsp; &nbsp; &nbsp;if (!changed) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " balks");
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return;
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;doSave();
 &nbsp; &nbsp; &nbsp; &nbsp;changed = false;
 &nbsp;  }
​
 &nbsp; &nbsp;// 将数据内容实际保存到文件中
 &nbsp; &nbsp;private void doSave() throws IOException {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " calls doSave, content = " + content);
 &nbsp; &nbsp; &nbsp; &nbsp;Writer writer = new FileWriter(filename);
 &nbsp; &nbsp; &nbsp; &nbsp;writer.write(content);
 &nbsp; &nbsp; &nbsp; &nbsp;writer.close();
 &nbsp;  }
}
​
​

SaverThread

public class SaverThread extends Thread {
 &nbsp; &nbsp;private final Data data;
 &nbsp; &nbsp;public SaverThread(String name, Data data) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.data = data;
 &nbsp;  }
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (true) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;data.save(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 要求保存数据
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(1000); &nbsp; &nbsp; // 休眠约1秒
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  } catch (IOException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

ChangerThread

public class ChangerThread extends Thread {
 &nbsp; &nbsp;private final Data data;
 &nbsp; &nbsp;private final Random random = new Random();
 &nbsp; &nbsp;public ChangerThread(String name, Data data) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.data = data;
 &nbsp;  }
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; true; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;data.change("No." + i); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // 修改数据
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(random.nextInt(1000)); // 执行其他操作
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;data.save(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 显式地保存
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  } catch (IOException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

Test

public class Test {
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;Data data = new Data("data.txt", "(empty)");
 &nbsp; &nbsp; &nbsp; &nbsp;new ChangerThread("ChangerThread", data).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new SaverThread("SaverThread", data).start();
 &nbsp;  }
}

Balking 模式中登场角色

  • GuardedObject(被守护的对象)

    GuardedObject角色是一个拥有被防护的方法(guardedMethod)的类。当线程执行 guardedMethod方法时, 若守护条件成立, 则执行实际的处理。而当守护条件不成立时, 则不执行实际的处理, 直接返回。守护条件的成立与否, 会随着 GuardedObject 角色的状态变化而发生变化。

    除了 guardedMethod之外, GuardedObject角色还有可能有其他来改变状态的方法(stateChangingMethod)。

在什么场景下使用 Balking 模式

  • 并不需要执行时

  • 不需要等待守护条件成立时

  • 守护条件仅在第一次成立时

Producer-Consumer(生产者/消费者) 模式

模式简介

就是生产者-消费者模式。生产者和消费者在为不同的处理线程,生产者必须将数据安全地交给消费者,消费者进行消费时,如果生产者还没有建立数据,则消费者需要等待。 一般来说,可能存在多个生产者和消费者,不过也有可能生产者和消费者都只有一个,当双方都只有一个时,我们也称之为Pipe Pattern

示例程序

MakerThread

用于制作蛋糕, 并将其放置到桌子上, 也就是糕点师 。

该类会先暂停一段随机时长, 然后再调用 Table 类的 put 方法将制作好的蛋糕放置到桌子上。暂停的这段时间模拟的是 "制作蛋糕所花费的时间"。

public class MakerThread extends Thread {
 &nbsp; &nbsp;private final Random random;
 &nbsp; &nbsp;private final Table table;
 &nbsp; &nbsp;private static int id = 0; // 蛋糕的流水号(所有糕点师共用)
 &nbsp; &nbsp;public MakerThread(String name, Table table, long seed) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.table = table;
 &nbsp; &nbsp; &nbsp; &nbsp;this.random = new Random(seed);
 &nbsp;  }
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (true) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(random.nextInt(1000));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String cake = "[ Cake No." + nextId() + " by " + getName() + " ]";
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;table.put(cake);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
 &nbsp; &nbsp;private static synchronized int nextId() {
 &nbsp; &nbsp; &nbsp; &nbsp;return id++;
 &nbsp;  }
}
​

EaterThread

用于表示从桌子上取蛋糕吃的客人。

客人通过 Table 类的take 方法取桌子上的蛋糕。然后 , 与 MakerThread 类一样, EaterThread也会暂停一段随机长的时间。这段暂停时间模拟的是 "吃蛋糕花费的时间"。

public class EaterThread extends Thread {
 &nbsp; &nbsp;private final Random random;
 &nbsp; &nbsp;private final Table table;
​
 &nbsp; &nbsp;public EaterThread(String name, Table table, long seed) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.table = table;
 &nbsp; &nbsp; &nbsp; &nbsp;this.random = new Random(seed);
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (true) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;String cake = table.take();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(random.nextInt(1000));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

Table

public class Table {
 &nbsp; &nbsp;private final String[] buffer;
 &nbsp; &nbsp;private int tail; &nbsp;// 下次put的位置
 &nbsp; &nbsp;private int head; &nbsp;// 下次take的位置
 &nbsp; &nbsp;private int count; // buffer中的蛋糕个数
​
 &nbsp; &nbsp;public Table(int count) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.buffer = new String[count];
 &nbsp; &nbsp; &nbsp; &nbsp;this.head = 0;
 &nbsp; &nbsp; &nbsp; &nbsp;this.tail = 0;
 &nbsp; &nbsp; &nbsp; &nbsp;this.count = 0;
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized void put(String cake) throws InterruptedException {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " puts " + cake);
 &nbsp; &nbsp; &nbsp; &nbsp;while (count >= buffer.length) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " wait BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wait();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " wait END");
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;buffer[tail] = cake;
 &nbsp; &nbsp; &nbsp; &nbsp;tail = (tail + 1) % buffer.length;
 &nbsp; &nbsp; &nbsp; &nbsp;count++;
 &nbsp; &nbsp; &nbsp; &nbsp;notifyAll();
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized String take() throws InterruptedException {
 &nbsp; &nbsp; &nbsp; &nbsp;while (count <= 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " wait BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wait();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " wait END");
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;String cake = buffer[head];
 &nbsp; &nbsp; &nbsp; &nbsp;head = (head + 1) % buffer.length;
 &nbsp; &nbsp; &nbsp; &nbsp;count--;
 &nbsp; &nbsp; &nbsp; &nbsp;notifyAll();
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " takes " + cake);
 &nbsp; &nbsp; &nbsp; &nbsp;return cake;
 &nbsp;  }
}
​

Producer-Consumer 模式中的登场角色

  • Data: Producer角色生成 Data 角色, 并将其传递给 Channel

  • Producer (生产者) : 从 Channel 角色获取 Data角色并使用 。

  • Consumer (消费者) : 从 Channel角色获取 Data角色并使用。

  • Channel (通道) : Channel 角色保管从 Producer角色来获取 Data角色, 还会响应 Consumer角色的请求, 传递 Data角色。为了确保安全性, Channel角色会对Producer角色和 Consumer角色的访问执行互斥处理。

    Producer角色将 Data角色传递给 Channel角色时, 如果 Channel角色的状态不适合接收 Data角色, 那么 Producer角色将一直等待, 直至 Channel角色的状态变为可以接收为止。

    Consumer角色从Channel角色获取 Data角色时, 如果 Channel角色中没有可以传递的 Data角色, 那么 Consumer角色将一直等待, 直至 Channel 角色的状态变为可以传递 Data角色为止。

    当存在多个 Producer角色 和 Consumer角色时, 为了避免各处理互相影响, Channel角色需要执行互斥处理。

    Channel角色相当于 Producer角色 和 Consumer角色之间, 承担用于传递 Data角色的中转站、通道的任务。

在什么场景下使用 Producer-Consumer

  • 守护安全性的 Channel 角色

    在该模式下, 承担安全守护责任的是 Channel 角色。Channel角色执行线程间的互斥处理, 确保 Producer角色正确地将 Data角色传递给 Consumer角色。

  • Producer角色不可以直接调用 Consumer方法么?

    • 直接调用方法

      Consumer 角色想要获取Data角色, 通常都是因为想使用这些 Data角色来执行某些处理。如果 Producer角色直接调用 Consumer角色的方法, 那么执行处理的就不是 Consumer角色的线程, 而是 Producer角色的线程了。

      这样一来, 执行处理花费的时间必须由 Producer角色的线程来承担, 准备下一个数据的处理也会相应发生延迟。这样子会使程序的响应性变得很差。

    • 插入 Channel 角色

      Producer角色将 Data角色传递给 Channel角色之后, 无需等待 Consumer角色对 Data角色进行处理。可以立即开始准备下一个 Data角色。也就是说, Producer角色可以持续不断地创建Data角色。Producer角色不会受到 Consumer角色的处理进展状况的影响。

  • Channel 角色的剩余空间所导致的问题

    在示例程序中, 桌子上最多可以放置3个蛋糕。当糕点师放置蛋糕时,如果桌子上的蛋糕个数在3个以内, 则可以顺利放置, 但如果是4个及4个以上时, 就必须等待客人取走蛋糕才行。如果客人吃得慢, 糕点师就必须等待很久。

    如果桌子上可以放置的蛋糕个数增多会怎么样呢? 这时, 就算客人吃得慢, 糕点师也无需等待, 可以直接制作蛋糕并放到桌子上。桌子上可以放置的蛋糕个数(buffer字段的元素个数)是用于缓冲MakerThreadEaterThread之间的处理速度差的。

    当然, 如果客人吃蛋糕的平均速度小于糕点师制作蛋糕的速度, 那么桌子上的蛋糕会逐渐增多, 一段时间后还是会达到 buffer字段的元素个数上限。

    如果使用 java.util.LinkedList类, 那么创建的 Channel 角色能储存的实例个数就不会存在上限。但这时, 如果 EaterThread的平均速度较慢, 一段时间之后(或许要过很长一段时间) 内存就会不足, 也就无法再创建表示蛋糕的实例。

  • 以什么顺序传递 Data 角色呢

    • 队列 ---- 先接收的先传递

    • 栈 ---- 后接收的先传递

    • 优先队列 ---- "优先"的先传递

  • "存在中间角色"的意义

    • 线程的协调运行要考虑 "放在中间的东西"

    • 线程的互斥处理要考虑 "应该保护的东西"

  • Consumer角色只有一个使会怎么样呢

    该模式考虑多个Producer给多个 Consumer传递数据的情况。这里, 我们来思考一下 Consumer角色只有一个时程序会怎么样。也就是 "多个 Producer角色和一个Consumer角色"的情况。

    如果 Consumer角色有一个, 也就是说处理 Channel角色中储存的Data角色的线程只有一个, 不需要注意Consumer角色的线程之间互相影响。如果 Consumer角色有多个, 我们就要注意不能让 Consumer角色的线程之间互相影响。[多生产者单消费者模型类似于事件分发机制]

Read-Write Lock 模式

Read-Write Lock 模式 简介

将读取与写入分开处理,在读取数据之前必须获取用来读取的锁定,而写入的时候必须获取用来写入的锁定。

因为读取时实例的状态不会改变,所以多个线程可以同时读取;

但是,写入会改变实例的状态,所以当有一个线程写入的时候,其它线程既不能读取与不能写入。

示例程序

Data

package code.rocky.readWriteLock;
​
public class Data {
 &nbsp; &nbsp;private final char[] buffer;
 &nbsp; &nbsp;private final ReadWriteLock lock = new ReadWriteLock();
​
 &nbsp; &nbsp;public Data(int size) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.buffer = new char[size];
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < buffer.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;buffer[i] = '*';
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;public char[] read() throws InterruptedException {
 &nbsp; &nbsp; &nbsp; &nbsp;lock.readLock();
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return doRead();
 &nbsp; &nbsp; &nbsp;  } finally {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 释放用于读取的锁
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;lock.readUnlock();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;public void write(char c) throws InterruptedException {
 &nbsp; &nbsp; &nbsp; &nbsp;lock.writeLock();
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;doWrite(c);
 &nbsp; &nbsp; &nbsp;  } finally {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 确保一定会调用 采用 before/after模式
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;lock.writeUnlock();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;// 用于创建一个新的 char 数组,来复制 buffer 字段的内容 ,并返回 newbuf
 &nbsp; &nbsp;private char[] doRead() {
 &nbsp; &nbsp; &nbsp; &nbsp;char[] newbuf = new char[buffer.length];
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < buffer.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;newbuf[i] = buffer[i];
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;slowly();
 &nbsp; &nbsp; &nbsp; &nbsp;return newbuf;
 &nbsp;  }
    // 用于执行实际的写入操作。该方法会以参数传入的字符c来填满 buffer字段。"以传入的字符填满数组" 这个操作并没有什么特别的意义, 只是为了让运行结果容易理解而已。注意: slowly(), 这里假定了 写入操作的时间比读取操作的时间长
 &nbsp; &nbsp;private void doWrite(char c) {
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < buffer.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;buffer[i] = c;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;slowly();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;private void slowly() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(50);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}
​

WriteThread

package code.rocky.readWriteLock;
​
import java.util.Random;
​
public class WriterThread extends Thread {
 &nbsp; &nbsp;private static final Random random = new Random();
 &nbsp; &nbsp;private final Data data;
 &nbsp; &nbsp;private final String filler;
 &nbsp; &nbsp;private int index = 0;
​
 &nbsp; &nbsp;public WriterThread(Data data, String filler) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.data = data;
 &nbsp; &nbsp; &nbsp; &nbsp;this.filler = filler;
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (true) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;char c = nextchar();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;data.write(c);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 0~3000毫秒随机休眠
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(random.nextInt(3000));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
    // 用于获取下一次应该写入的字符
 &nbsp; &nbsp;private char nextchar() {
 &nbsp; &nbsp; &nbsp; &nbsp;char c = filler.charAt(index);
 &nbsp; &nbsp; &nbsp; &nbsp;index++;
 &nbsp; &nbsp; &nbsp; &nbsp;if (index >= filler.length()) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;index = 0;
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;return c;
 &nbsp;  }
}
​
​

ReaderThread

package code.rocky.readWriteLock;
​
public class ReaderThread extends Thread {
 &nbsp; &nbsp;private final Data data;
​
 &nbsp; &nbsp;public ReaderThread(Data data) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.data = data;
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;long begin = System.currentTimeMillis();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < 20; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;char[] readBuf = data.read();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " reads " + String.valueOf(readBuf));
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;long time = System.currentTimeMillis() - begin;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + ": time = " + time);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}
​

ReadWriteLock

package code.rocky.readWriteLock;
​
public final class ReadWriteLock {
 &nbsp; &nbsp;private int readingReaders = 0; // (A)…实际正在读取中的线程个数
 &nbsp; &nbsp;private int waitingWriters = 0; // (B)…正在等待写入的线程个数
 &nbsp; &nbsp;private int writingWriters = 0; // (C)…实际正在写入中的线程个数
 &nbsp; &nbsp;private boolean preferWriter = true; // 若写入优先,则为true
​
 &nbsp; &nbsp;public synchronized void readLock() throws InterruptedException {
 &nbsp; &nbsp; &nbsp; &nbsp;while (writingWriters > 0 || (preferWriter && waitingWriters > 0)) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wait();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;readingReaders++; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // (A) 实际正在读取的线程个数加1
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized void readUnlock() {
 &nbsp; &nbsp; &nbsp; &nbsp;readingReaders--; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // (A) 实际正在读取的线程个数减1
 &nbsp; &nbsp; &nbsp; &nbsp;preferWriter = true;
 &nbsp; &nbsp; &nbsp; &nbsp;notifyAll();
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized void writeLock() throws InterruptedException {
 &nbsp; &nbsp; &nbsp; &nbsp;waitingWriters++; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // (B) 正在等待写入的线程个数加1
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (readingReaders > 0 || writingWriters > 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wait();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  } finally {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;waitingWriters--; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // (B) 正在等待写入的线程个数减1
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;writingWriters++; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // (C) 实际正在写入的线程个数加1
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized void writeUnlock() {
 &nbsp; &nbsp; &nbsp; &nbsp;writingWriters--; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; // (C) 实际正在写入的线程个数减1
 &nbsp; &nbsp; &nbsp; &nbsp;preferWriter = false;
 &nbsp; &nbsp; &nbsp; &nbsp;notifyAll();
 &nbsp;  }
}
​
​

Read-Write Lock登场角色

  • Reader(读者): Reader 角色对 SharedResource角色执行 read 操作。

  • Writer(写者): Writer 角色对 SharedResource角色执行 write 操作。

  • SharedResource(共享资源) : SharedResource 角色表示的是 Reader角色和 Write角色二者共享的资源。SharedResource 角色提供不修改内部状态的操作(read) 和修改内部状态的操作(write)。

  • ReadWriterLock(读写锁) : ReadWriterLock角色提供了 SharedResource 角色实现 read 操作和 write操作所需的锁。

在什么场景下使用 Read-Write Lock

  • 利用 "读取"操作的线程之间不会冲突的特性来提高程序性能

    Read-Write Lock 模式利用了读取操作的线程之间不会冲突的特性。由于读取操作不会修改 SharedResource角色的状态, 所以彼此之间无需执行互斥处理。因此多个 Reader角色可以同时执行 read操作, 从而提高程序性能。

    性能的提升还需要考虑 "适合读取操作繁重时" 和 "适合读取频率比写入频率高时" 两个大方针。

  • 适合读取操作繁重时

    在单纯使用 Single Threaded Execution模式的情况下, 就算是 read操作, 每次也只能运行一个线程。如果 read 的操作很繁重, 那么使用 Read-Write Lock模式比使用 Single Threaded Execution模式更加合适

    但是, 因为Read-Write Lock模式的处理比 Single Thread Execution模式复杂, 所以当 read 的操作很简单(不耗费时间)时, Single Thread Execution模式反而会更加合适。

  • 适合读取频率比写入频率高时

    Read-Write Lock模式的优点是 Reader 角色之间不会发生冲突, 但是, 如果写入处理(write) 的频率很高, write 角色便会频繁停止 Reader角色的处理, 这样就无法体现出 Read-Write Lock 模式的优点了。

  • 锁的含义

    • synchronized可以用于获取实例的锁。Java的每个实例都持有一个"锁", 但同一个锁不可以由两个以上的线程同时获取。这种结构是 Java 编程规范规定的, Java虚拟机也是这样实现的。这也是Java语言一开始就提供的所谓的物理锁, Java程序并不能改变这种锁的运行。

    • 而 "用于读取的锁" 和 "用于写入的锁" 所指的锁与使用 synchronized获取的锁是不一样的。这并不是 Java编程规范规定的结构, 而是开发人员自己实现的一种结构。逻辑锁

Thread-Per-Message模式

Thread-Per-Message 模式简介

Thread-Per-Message 模式是指每个message一个线程,message可以理解成“消息”、“命令”或者“请求”。

每一个message都会分配一个线程,由这个线程执行工作,使用Thread-Per-Message Pattern时,“委托消息的一端”与“执行消息的一端”回会是不同的线程。

示例程序

Helper

public class Helper {
 &nbsp; &nbsp;public void handle(int count, char c) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp; &nbsp; &nbsp;  handle(" + count + ", " + c + ") BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < count; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;slowly();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.print(c);
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("");
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp; &nbsp; &nbsp;  handle(" + count + ", " + c + ") END");
 &nbsp;  }
 &nbsp; &nbsp;private void slowly() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(100);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}
​

Host

public class Host {
 &nbsp; &nbsp;private final Helper helper = new Helper();
 &nbsp; &nbsp;public void request(final int count, final char c) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp;  request(" + count + ", " + c + ") BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;helper.handle(count, c);
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp;  request(" + count + ", " + c + ") END");
 &nbsp;  }
}

HostTest

public class HostTest {
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;Host host = new Host();
 &nbsp; &nbsp; &nbsp; &nbsp;host.request(10, 'A');
 &nbsp; &nbsp; &nbsp; &nbsp;host.request(20, 'B');
 &nbsp; &nbsp; &nbsp; &nbsp;host.request(30, 'C');
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main END");
 &nbsp;  }
}
​

Thread-Per-Message模式中的登场角色

  • Client (委托人) : Client角色会向 Host 角色发出请求(request), 但是并不知道 Host 角色 是如何实现该请求的。

  • Host : Host角色收到 Client 角色的请求(request) 之后, 会新创建 并启动一个线程。新创建的线程将使用 Helper 角色来处理请求。

  • Helper (助手): Helper角色 为 Host 角色提供请求处理的功能。Host角色创建的新线程会利用 Helper角色。

在什么场景下使用 Thread-Per-Message模式

  • 提高响应性, 缩短延迟时间

    Thread-Per-Message 模式能够提高与 Client 角色 对应的 Host角色的响应性, 降低延迟时间。尤其是当 handle 操作非常耗时, 或者 handle 操作需要等待 输入/输出时, 效果非常明显。

    在 Thread-Per-Message 模式下, Host 角色会启动新的线程。由于启动线程也会花费时间, 所以想要提高响应性时, 是否使用 Thread-Per-Message 模型 取决于 "handle操作花费的时间" 和 "线程启动花费的时间" 之间的均衡。

    为了缩短线程启动花费的时间, 我们可以使用 worker Thread模式

  • 适用于操作顺序没有要求时

    在 Thread-Per-Message模式中, handle 方法并不一定是按 request 方法的调用顺序来执行的。

  • 适用于不需要返回值时

    在 Thread-Per-Message 模式中, request 方法并不会等待 handle 方法执行结束。所以 request 得不到 handle 的运行结果。因此, Thread-Per-Message模式适用于不需要获取返回值的情况。

  • 应用于服务器

    为了使服务器可以处理多个请求, 我们可以使用 Thread-Per-Message模式。服务器本身的线程接收客户端的请求, 而这些请求的实际处理则交由其他线程来执行, 服务器本身的线程则返回, 去等待客户端的其他请求。

  • 调用方法 + 启动线程 ---> 发送消息

    通常情况下, 当调用方法时, 该方法中的所有处理都被执行完之后, 控制权才会返回。Thread-Per-Message 模式的 request 方法也是一个普通方法, 所以当该方法中的处理被执行完毕后, 控制权就会返回。

    "request 方法真正想要的操作(显示字符串)执行了吗?" 虽然控制权从 request 返回了, 但这并等于显示 字符串的操作也就执行完了。虽然 request 触发了目标操作的开始(触发器), 但并不等待处理结束。

Worker Thread 模式

什么是 worker thread 模式

这是一个来自工作车间的故事, 在这里, 工人们负责组装塑料模型。

客人会将很多装有塑料模型的箱子带到工作车间来, 然后摆放在桌子上。

工人必须将客户送过来的塑料模型一个一个组装起来。他们会先取回放在桌子上的装有塑料模型的箱子, 然后在阅读了箱子中的说明书后开始组装。当一箱模型组装完成后, 工人们会继续去取下一个箱子。当所有模型全部组装完成后, 工人们会等待新的模型被送过来。

也称为 Thread Pool(线程池) 模式

示例程序

ClientThread类的线程会向 Channel类发送工作请求(委托) (说是工作, 其实只是显示出委托者的名字和委托编号)

Channel 类的实例雇佣了五个工人线程(WorkerThread) 进行工作。所有工人线程都在等待工作请求的到来。

工作请求到来后, 工人线程会从 Channel那里获取一项工作请求并开始工作。工作完成后, 工人线程会回到 Channel那里等待下一项工作请求。

Channel

public class Channel {
 &nbsp; &nbsp;private static final int MAX_REQUEST = 100;
 &nbsp; &nbsp;private final Request[] requestQueue;
 &nbsp; &nbsp;private int tail; &nbsp;// 下次putRequest的位置
 &nbsp; &nbsp;private int head; &nbsp;// 下次takeRequest的位置
 &nbsp; &nbsp;private int count; // Request的数量
​
 &nbsp; &nbsp;private final WorkerThread[] threadPool;
​
 &nbsp; &nbsp;public Channel(int threads) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.requestQueue = new Request[MAX_REQUEST];
 &nbsp; &nbsp; &nbsp; &nbsp;this.head = 0;
 &nbsp; &nbsp; &nbsp; &nbsp;this.tail = 0;
 &nbsp; &nbsp; &nbsp; &nbsp;this.count = 0;
​
 &nbsp; &nbsp; &nbsp; &nbsp;threadPool = new WorkerThread[threads];
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < threadPool.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;threadPool[i] = new WorkerThread("Worker-" + i, this);
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;public void startWorkers() {
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < threadPool.length; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;threadPool[i].start();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized void putRequest(Request request) {
 &nbsp; &nbsp; &nbsp; &nbsp;while (count >= requestQueue.length) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wait();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;requestQueue[tail] = request;
 &nbsp; &nbsp; &nbsp; &nbsp;tail = (tail + 1) % requestQueue.length;
 &nbsp; &nbsp; &nbsp; &nbsp;count++;
 &nbsp; &nbsp; &nbsp; &nbsp;notifyAll();
 &nbsp;  }
​
 &nbsp; &nbsp;public synchronized Request takeRequest() {
 &nbsp; &nbsp; &nbsp; &nbsp;while (count <= 0) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;wait();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;Request request = requestQueue[head];
 &nbsp; &nbsp; &nbsp; &nbsp;head = (head + 1) % requestQueue.length;
 &nbsp; &nbsp; &nbsp; &nbsp;count--;
 &nbsp; &nbsp; &nbsp; &nbsp;notifyAll();
 &nbsp; &nbsp; &nbsp; &nbsp;return request;
 &nbsp;  }
}
​
​

Request

public class Request {
 &nbsp; &nbsp;private final String name;
 &nbsp; &nbsp;private final int number;
​
 &nbsp; &nbsp;public Request(String name, int number) {
 &nbsp; &nbsp; &nbsp; &nbsp;this.name = name;
 &nbsp; &nbsp; &nbsp; &nbsp;this.number = number;
 &nbsp;  }
​
 &nbsp; &nbsp;public void execute() {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(Thread.currentThread().getName() + " executes " + this);
 &nbsp;  }
​
 &nbsp; &nbsp;public String toString() {
 &nbsp; &nbsp; &nbsp; &nbsp;return "[ Request from " + name + " No." + number + " ]";
 &nbsp;  }
}

ClientThread

public class ClientThread extends Thread {
 &nbsp; &nbsp;private final Channel channel;
​
 &nbsp; &nbsp;public ClientThread(String name, Channel channel) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.channel = channel;
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; true; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Request request = new Request(getName(), i);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;channel.putRequest(request);
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

WorkerThread

public class WorkerThread extends Thread {
 &nbsp; &nbsp;private final Channel channel;
​
 &nbsp; &nbsp;public WorkerThread(String name, Channel channel) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp; &nbsp; &nbsp; &nbsp;this.channel = channel;
 &nbsp;  }
​
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;while (true) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Request request = channel.takeRequest();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;request.execute();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
}

ChannelTest

public class ChannelTest {
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;Channel channel = new Channel(5); &nbsp; // 工人线程的个数
 &nbsp; &nbsp; &nbsp; &nbsp;channel.startWorkers();
 &nbsp; &nbsp; &nbsp; &nbsp;new ClientThread("Alice", channel).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new ClientThread("Bobby", channel).start();
 &nbsp; &nbsp; &nbsp; &nbsp;new ClientThread("Chris", channel).start();
​
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(30000);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;System.exit(0);
 &nbsp;  }
}

Worker Thread 模式中的登场角色

  • Client (委托者) : 角色创建表示工作请求的 Request 角色并将其传递给 Channel角色。

  • Channel (通信线路) : Channel角色接收来自于 Client角色的 Request角色, 并将其传递给 Worker 角色。

  • Worker (工人) : Worker角色从 Channel角色, 并进行工作。当一项工作完成后, 它会继续去获取另外的 Request角色。

  • Request (请求) : Request 角色是表示工作的角色, 并进行工作。当一项工作完成后, 它会继续去获取另外的 Request角色。

什么场景下使用 Worker Thread模式

  • 提高吞吐量

    如果可以将自己的工作交给其他人, 那么自己就可以做下一项工作。线程也是一样的。如果将工作交给其他线程, 自己就可以做下一项工作(Thread-Per-Message)。

    由于启动新线程需要花费时间, 所以 Worker Thread 模式的主题之一就是通过轮流地和反复地使用线程来提高吞吐量。

  • 容量控制

    Worker Thread模式还有另外一个主题, 那就是可以同时提供的服务的数量, 即容量的控制。

    • Worker 角色的数量

      Worker角色的数量是可以自由地定义的。在示例程序中, 传递给 Channel的构造函数的参数 threads 即表示这个数值。Worker 角色会创建 threads 个 WorkerThread 的实例。

      Worker角色的数量越多, 可以并发进行的处理也越多。但是, 即使 Worker角色的数量超过了 同时被请求的工作数量,也不会对提高程序处理效率有什么帮助。因为多余的 Worker角色不但不会工作, 还会占有内存。增加容量就会增加消耗的资源, 所以必须根据程序实际运行的环境来相应地调整 Worker角色的数量。

      1. 最开始只有几个 Worker 角色

      2. 当工作增加时就增加 Worker 角色

      3. 但是, 如果增加得太多会导致内存耗尽, 因此到达极限后就不再增加 Worker角色

      4. 反之,当工作减少(即等待工作的 Worker角色增加)时, 就要逐渐减少 Worker角色

    • Request 角色的数量

      Channel角色中保存着 Request角色。 只要Worker角色不断地进行工作, 在 Channel角色中保存的 Request角色就不会增加很多。不过, 当接收到的工作的数量超出了 Worker角色的处理能力后, Channel角色中就会积累很多 Request角色。这时, Client角色必须等待一段时间才能将 Request角色发送给 Channel角色。

      如果 Channel角色可以保存很多 Request角色, 那么就可以填补(缓冲) Client角色与 Worker角色之间的处理速度差异。

  • 调用与执行的分离

    Client 角色负责发送工作请求。它会将工作内容封装为 Request角色, 然后传递给 Channel 角色。在普通的方法调用中, 这部分相当于 "设置参数并调用方法"。其中, "设置参数" 与 "创建 Request角色"相对应, 而 "传递给 Channel角色" 大致与 "调用方法"相对应。

    Worker角色负责进行工作。它使用从 Channel角色接收到的 Request角色来执行实际的处理。在普通的方法调用中, 这部分相当于 "执行方法"。

    在进行普通的方法调用时, "调用方法" 和 "执行方法" 是连续进行的。因为调用方法后, 方法会立即执行。在普通的方法调用中, 调用与执行是无法分开的。

    Worker Thread 模式 和 Thread-Per-Message模式中, 方法的调用和方法的执行是特意被分开的。方法的调用被称为 invocation,方法的执行被称为 execution。因此, 可以说 Worker Thread模式和 Thread-Per-Message模式将方法的调用和执行 分离开了。调用与执行的分离同时也是 Command模式。

    • 提高响应速度

      如果调用和执行不可分离, 那么当执行需要花费很长时间时, 就会拖调用处理的后腿。但是如果将调用和执行分离, 那么即使执行需要花费很长时间也没有什么关系, 因为执行完调用处理的一方可以先继续执行其他处理, 这样就可以提高响应速度。

    • 控制执行顺序 (调度)

      如果调用和执行不可分离, 那么在调用后就必须开始执行。

      但是如果将调用和执行分离, 执行就可以不再受调用顺序的制约。我们可以通过设置 Request角色的优先级, 并控制 Channel 角色 将 Request角色传递给Worker角色的顺序来实现上述处理。

    • 可以取消和反复执行

      将调用和执行分离后, 还可以实现 "即使调用了也可以取消执行" 这种功能。

      由于调用的结果是 Request角色对象, 所以既可以将 Request角色保存, 又可以反复地执行。

    • 通往分布式之路

      将调用和执行分离后, 可以将负责调用的计算机与负责执行的计算机分离开来, 然后通过网络将扮演 Request角色的对象从一台计算机传递至另外一台计算机。

  • Runnable 接口的意义

    java.lang.Runnable接口有时会被用作 Worker Thread模式中的 Request角色。也就是说, 该模式会创建一个实现了 Runnable接口的类的实例对象(Runnable对象) 来表示工作内容, 然后将它传递给 Channel角色,让其完成这项工作。

  • 多态 Request 角色

    在示例程序中, ClientThread传递给 Channel的只是 Request的实例。但是, WorkThread并不知道 Request类的详细信息。WorkerThread只是单纯的接收 Request的实例, 然后调用它的 execute方法而已。

    也就是说 , 即使我们编写了一个 Request类的子类并将它的实例传递给了 Channel, WorkThread也可以正常地调用 execute方法。用面向对象的术语来说, 就是这里使用了多态性。

    Request类是表示工作的类, 编写 Request类的子类相当于增加工作的种类。我们就实现了具有多态的 Request角色。

    Request角色中包含了完成工作所必需的全部信息。因此, 即使我们实现了多态的 Request角色并增加了工作的种类, 也无需修改 Channel角色和 Worker角色。这是因为即使工作种类增加了, worker角色依然只是调用 execute方法而已。

  • 独自一人的 Worker 角色

    请大家想象一下工人线程(Worker角色只有一个的情况。)当工人线程只有一个时, 由于工人线程进行处理的范围变成了单线程, 所以会有互斥处理可以省略的可能性。

Future 模式

什么是 Future 模式

Future 的意思是 未来、期货。假设有一个方法需要花费很长时间才能获取运行结果。那么, 与其一直等待结果, 不如先拿一张 "提货单"。获取提货单并不耗费时间。这里的 "提货单" 我们称为 Future角色。

获取 Future角色的线程会在稍后使用 Future角色来获取运行结果。这与凭着提货单去取蛋糕非常相似。如果运行结果已经出来了,那么直接领取即可; 如果运行结果还没有出来, 那么需要等待结果出来。

示例程序

Data

public interface Data {
 &nbsp; &nbsp;public abstract String getContent();
}

FutureData

public class FutureData extends FutureTask<RealData> implements Data {
 &nbsp; &nbsp;public FutureData(Callable<RealData> callable) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(callable);
 &nbsp;  }
 &nbsp; &nbsp;public String getContent() {
 &nbsp; &nbsp; &nbsp; &nbsp;String string = null;
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;string = get().getContent();
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  } catch (ExecutionException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;return string;
 &nbsp;  }
}

Host

public class Host {
 &nbsp; &nbsp;public FutureData request(final int count, final char c) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp;  request(" + count + ", " + c + ") BEGIN");
​
 &nbsp; &nbsp; &nbsp; &nbsp;// (1) 创建FutureData的实例
 &nbsp; &nbsp; &nbsp; &nbsp;// &nbsp; &nbsp; (向构造函数中传递 Callable<RealData>)
 &nbsp; &nbsp; &nbsp; &nbsp;FutureData future = new FutureData(
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;new Callable<RealData>() {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;public RealData call() {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;return new RealData(count, c);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  );
​
 &nbsp; &nbsp; &nbsp; &nbsp;// (2) 启动一个新线程,用于创建RealData的实例
 &nbsp; &nbsp; &nbsp; &nbsp;new Thread(future).start();
​
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp;  request(" + count + ", " + c + ") END");
​
 &nbsp; &nbsp; &nbsp; &nbsp;// (3) 返回FutureData的实例
 &nbsp; &nbsp; &nbsp; &nbsp;return future;
 &nbsp;  }
}
​

RealData

public class RealData implements Data {
 &nbsp; &nbsp;private final String content;
 &nbsp; &nbsp;public RealData(int count, char c) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp; &nbsp; &nbsp;  making RealData(" + count + ", " + c + ") BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;char[] buffer = new char[count];
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < count; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;buffer[i] = c;
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(100);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(" &nbsp; &nbsp; &nbsp;  making RealData(" + count + ", " + c + ") END");
 &nbsp; &nbsp; &nbsp; &nbsp;this.content = new String(buffer);
 &nbsp;  }
 &nbsp; &nbsp;public String getContent() {
 &nbsp; &nbsp; &nbsp; &nbsp;return content;
 &nbsp;  }
}

HostTest

public class HostTest {
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;Host host = new Host();
 &nbsp; &nbsp; &nbsp; &nbsp;Data data1 = host.request(10, 'A');
 &nbsp; &nbsp; &nbsp; &nbsp;Data data2 = host.request(20, 'B');
 &nbsp; &nbsp; &nbsp; &nbsp;Data data3 = host.request(30, 'C');
​
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main otherJob BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(2000);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main otherJob END");
​
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("data1 = " + data1.getContent());
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("data2 = " + data2.getContent());
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("data3 = " + data3.getContent());
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main END");
 &nbsp;  }
}

Future 模式中的登场角色

  • Client : Client角色向 Host 角色发出请求(request) , 并会立即接收到请求的处理结果(返回值) ---VirtualData角色

  • Host: Host角色会创建新的线程, 并开始在新线程中创建 RealData角色。同时, 它会将 Future角色返回给 Client角色。

  • VirtualData(虚拟数据) : VirtualData角色是让 Future角色与 RealData角色具有一致性的角色。

  • RealData(真实数据): 表示真实数据的角色。创建该对象需要花费很多时间。

  • Future : Future 角色是 RealData角色的 "提货单", 由 Host角色传递给 Client角色。Future角色就是 VirtualData角色。实际上, 当 Client角色操作Future角色时, 线程会调用 wait方法等待, 直至 RealData角色创建完成。但是, 一旦 RealData角色创建完成, 线程就不会再继续等待。Future角色会将 Client角色的操作委托给 RealData角色。

什么场景下使用Future模式

  • 吞吐量会提高吗

  • 异步方法调用的 "返回值"

  • "准备返回值" 和 "使用返回值" 的分离

  • 变种 --- 不让主线程久等的 Future 角色

  • 变种 --- 会发生变化的 Future 角色

  • 谁会在意多线程呢? "可复用性"

  • 回调 与 Future 模式

Two-phase Termination(两阶段终止) 模式

什么是 Two-phase Termination

小孩子在玩玩具时经常会将玩具弄得满房间都是。晚上到了睡觉时间, 妈妈就会对小孩子说: "先收拾房间再睡觉噢", 这时, 小孩子会开始打扫房间。

它是一种先执行完终止处理再终止线程的模式。

示例程序

CountUpThread

package code.rocky.twoPhaseTermination;
​
public class CountUpThread extends Thread {
 &nbsp; &nbsp;// 计数值
 &nbsp; &nbsp;private long counter = 0;
​
 &nbsp; &nbsp;// 终止请求
 &nbsp; &nbsp;public void shutdownRequest() {
 &nbsp; &nbsp; &nbsp; &nbsp;interrupt();
 &nbsp;  }
​
 &nbsp; &nbsp;// 线程体
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;while (!isInterrupted()) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;doWork();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  } finally {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;doShutdown();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;// 操作
 &nbsp; &nbsp;private void doWork() throws InterruptedException {
 &nbsp; &nbsp; &nbsp; &nbsp;counter++;
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("doWork: counter = " + counter);
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(500);
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;// 终止处理
 &nbsp; &nbsp;private void doShutdown() {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("doShutdown: counter = " + counter);
 &nbsp;  }
}
​

CountUpThreadTest

package code.rocky.twoPhaseTermination;
​
public class CountUpThreadTest {
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main: BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 启动线程
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;CountUpThread t = new CountUpThread();
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;t.start();
​
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 稍微间隔一段时间
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(10000);
​
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 线程的终止请求
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main: shutdownRequest");
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;t.shutdownRequest();
​
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main: join");
​
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;// 等待线程终止
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;t.join();
 &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("main: END");
 &nbsp;  }
}
​
​

Two-Phase Termination 模式中的登场角色

  • TerminationRequester(终止请求发出者) : TerminationRequester角色负责向 Terminator角色发出终止请求。

  • Terminator(终止者) : Terminator角色负责接收终止请求, 并实际执行终止处理。它提供了表示终止请求的 shutdownRequest方法不需要使用 Single Threaded Execution

什么场景下使用 Two-Phase Termination

  • 不能使用 Thread类的 stop 方法

    java.lang.Thread类提供了用于强制终止线程的 stop 方法。但是 stop 是 "不推荐使用的方法",我们不应当使用它。

    因为如果使用 stop 方法, 实例的安全性就无法确保。使用 stop 方法后, 线程会在抛出 java.lang.ThreadDeath异常后终止。即使该线程正处于访问临界区的过程中也会终止。

  • 仅仅检查标志是不够的

    因为当想要终止线程时, 该线程可能正在 sleep。而当线程正在 sleep 时, 即使将 shutdownRequest标志设置为 true, 线程也不会开始终止处理。等到 sleep 时间过后, 线程可能会在某个时间点开始终止处理, 但是这样程序的响应性就下降了。如果使用 interrupt方法的话, 就可以中断 sleep。

    另外, 线程当时也可能正在 wait。而当线程正在 wait 时, 即使将 shutdownRequested标志设为true, 线程也不会从等待队列中出来, 所以我们必须使用 interrupt方法对线程下达 中断wait的指标。

  • 仅仅检查中断状态是不够的

    调用interrupt方法后,如果线程正在 sleep或是 wait, 那么会抛出 InterruptedExecption异常,而如果不抛出异常, 线程就会变为中断状态。也就是说, 没有必要特意准备一个新的 shutdownRequested标志。只要捕获 InterruptedException, 使用 isInterrupted方法来检查线程是否处于中断状态不就可以了么

  • 在长时间处理前检查终止请求

    为了能够在接受到终止请求后立即开始终止处理, 我们应当在执行长时间处理前检查 shutdownRequested标志或是调用 isShutdownRequested方法。

  • join 方法 和 isAlive方法

    isAlive方法来确定指定的线程是否已经终止。如果返回值是true, 则表示该线程还活着; 如果返回值是false, 则表示该线程已经终止。使用 java.lang.ThreadgetState方法也可以获取线程的状态, 不过如果只是检查线程是否已经终止, 使用 isAlive会更好。

  • java.util.concurrent.ExecutorService 接口 与 Two-Phase Termination模式

  • 要捕获程序整体的终止时

    • 未捕获的异常的处理器

    • 退出钩子

  • 优雅地终止线程

    "线程优雅地执行终止处理, 然后终止运行" 这种状态用英语单词来形容的话, 就是Graceful(优雅的、高贵的、得体的)。这种状态相当于工作的结束并不是慌慌张张地放下已经着手的工作不管, 而是在进行必要的整理后才正式终止。

    • 安全地终止(安全性)

      即使接收到终止请求, 线程也不会立即终止。首先表示是否已经接收到终止请求的 shutdownRequested标志会被设置为 true。然后, 仅在线程运行至不会破坏对象安全性的位置时, 程序才会开始终止处理。

    • 必定会进行终止处理(生存性)

      线程在接收到终止请求后, 会中断可以中断的wait, 转入终止处理。为此, shutdownRequest方法会调用 interrupt方法。

      另外, 为了确保在抛出异常后程序也会执行终止处理, 我们使用了 try… finally 语句块。

    • 发出终止请求后尽快进入终止线程(响应性)

      线程在接收到终止请求后, 会中断可以中断的sleep, 尽快进入终止处理。为此, shutdownRequest 方法会调用 interrupt 方法。

      另外, 在执行长时间处理前需要检查 shutdownRequested标志。

Thread Specific Storage(线程特有存储) 模式

什么是 Thread Specific Storage

就是“线程独有的存储库”,该模式会对每个线程提供独有的内存空间。java.lang.ThreadLocal类提供了该模式的实现,ThreadLocal的实例是一种集合(collection)架构,该实例管理了很多对象,可以想象成一个保管有大量保险箱的房间。

ThreadLocal 简介

见 我的 ThreadLocal 解读

示例程序

Log

public class Log {
 &nbsp; &nbsp;private static PrintWriter writer = null;
​
 &nbsp; &nbsp;// 初始化writer字段
 &nbsp; &nbsp;static {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;writer = new PrintWriter(new FileWriter("log.txt"));
 &nbsp; &nbsp; &nbsp;  } catch (IOException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;// 写日志
 &nbsp; &nbsp;public static void println(String s) {
 &nbsp; &nbsp; &nbsp; &nbsp;writer.println(s);
 &nbsp;  }
​
 &nbsp; &nbsp;// 关闭日志
 &nbsp; &nbsp;public static void close() {
 &nbsp; &nbsp; &nbsp; &nbsp;writer.println("==== End of log ====");
 &nbsp; &nbsp; &nbsp; &nbsp;writer.close();
 &nbsp;  }
}
​
​

LogTest

​
public class LogTest {
​
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < 10; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Log.println("main: i = " + i);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(100);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;Log.close();
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println("END");
 &nbsp;  }
}

使用 Thread-Specific-Storage模式 示例程序

ClientThread

public class ClientThread extends Thread {
 &nbsp; &nbsp;public ClientThread(String name) {
 &nbsp; &nbsp; &nbsp; &nbsp;super(name);
 &nbsp;  }
 &nbsp; &nbsp;public void run() {
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(getName() + " BEGIN");
 &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < 10; i++) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Log.println("i = " + i);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;Thread.sleep(100);
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  } catch (InterruptedException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp;  }
 &nbsp; &nbsp; &nbsp; &nbsp;Log.close();
 &nbsp; &nbsp; &nbsp; &nbsp;System.out.println(getName() + " END");
 &nbsp;  }
}

Log

public class Log {
 &nbsp; &nbsp;private static final ThreadLocal<TSLog> tsLogCollection = new ThreadLocal<TSLog>();
​
 &nbsp; &nbsp;// 写日志
 &nbsp; &nbsp;public static void println(String s) {
 &nbsp; &nbsp; &nbsp; &nbsp;getTSLog().println(s);
 &nbsp;  }
​
 &nbsp; &nbsp;// 关闭日志
 &nbsp; &nbsp;public static void close() {
 &nbsp; &nbsp; &nbsp; &nbsp;getTSLog().close();
 &nbsp;  }
​
 &nbsp; &nbsp;// 获取线程特有的日志
 &nbsp; &nbsp;private static TSLog getTSLog() {
 &nbsp; &nbsp; &nbsp; &nbsp;TSLog tsLog = tsLogCollection.get();
​
 &nbsp; &nbsp; &nbsp; &nbsp;// 如果该线程是第一次调用本方法,就新生成并注册一个日志
 &nbsp; &nbsp; &nbsp; &nbsp;if (tsLog == null) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tsLog = new TSLog(Thread.currentThread().getName() + "-log.txt");
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tsLogCollection.set(tsLog);
 &nbsp; &nbsp; &nbsp;  }
​
 &nbsp; &nbsp; &nbsp; &nbsp;return tsLog;
 &nbsp;  }
}
​

TSLog

public class TSLog {
 &nbsp; &nbsp;private PrintWriter writer = null;
​
 &nbsp; &nbsp;// 初始化writer字段
 &nbsp; &nbsp;public TSLog(String filename) {
 &nbsp; &nbsp; &nbsp; &nbsp;try {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;writer = new PrintWriter(new FileWriter(filename));
 &nbsp; &nbsp; &nbsp;  } catch (IOException e) {
 &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();
 &nbsp; &nbsp; &nbsp;  }
 &nbsp;  }
​
 &nbsp; &nbsp;// 写日志
 &nbsp; &nbsp;public void println(String s) {
 &nbsp; &nbsp; &nbsp; &nbsp;writer.println(s);
 &nbsp;  }
​
 &nbsp; &nbsp;// 关闭日志
 &nbsp; &nbsp;public void close() {
 &nbsp; &nbsp; &nbsp; &nbsp;writer.println("==== End of log ====");
 &nbsp; &nbsp; &nbsp; &nbsp;writer.close();
 &nbsp;  }
}
​

ClientThreadTest

public class ClientThreadTest {
 &nbsp; &nbsp;public static void main(String[] args) {
 &nbsp; &nbsp; &nbsp; &nbsp;new ClientThread("Alice").start();
 &nbsp; &nbsp; &nbsp; &nbsp;new ClientThread("Bobby").start();
 &nbsp; &nbsp; &nbsp; &nbsp;new ClientThread("Chris").start();
 &nbsp;  }
}

Thread-Specific Storage 模式中的登场角色

  • Client(委托者) : Client角色将处理委托给TSObjectProxy角色。一个 TSObjectProxy角色会被多个Client角色使用。

  • TSObjectProxy(线程特有的对象的代理人): TSObjectProxy角色会执行多个Client角色委托给它的处理。首先, TSObjectProxy角色使用 TSObjectCollection角色获取与Client角色对应的 TSObjet角色。

  • TSObjectCollection(线程特有的对象的集合):TSObjectCollection角色有一张Client角色与TSObject角色之间的对应表。 当getTSObject方法被调用后, 它会去查看对应表, 返回与 Client角色相对应的 TSObject角色。另外, 当 setTSObject方法被调用后, 它会将 Client角色与 TSObject角色之间的键值对应关系设置到对应表中。

  • TSObject(线程特有对象): TSObject角色中保存着线程特有的信息。TSObject角色由TSObjectCollection角色管理。TSObject角色的方法只会被单线程调用。

什么场景下使用 Thread-Specific Storage

  • 局部变量与 java.lang.ThreadLocal

    线程本来都是有自己特有的存储空间的, 即用于保存方法的局部变量的栈。方法中定义的局部变量属于该线程特有, 其他线程无法访问它们。但是,这些变量在方法调用结束后就会消失。而ThreadLocal则与方法调用无关,它是一个用于为线程分配特有的存储空间的类。

  • 保存线程特有的信息的位置

    线程特有的信息的"保存位置"有以下两种。

    1. 线程外(thread-external)

    2. 线程内(thread-internal)

    • 在线程外保存线程特有的信息

      TSLOG的实例都被保存在Log类中的java.lang.ThreadLocal的实例中。

      TreadLocal的实例就是存储间, 各个线程的储物间内。线程并不会背着储物柜四处走动。像这样, 将线程持有的信息保存在线程外部的方法称为 "线程外"。

    • 在线程内保存线程特有的信息

      编写一个 Thread 类的子类 -- MyThread。如果在 MyThread中声明字段, 该字段就是线程特有的信息。这就是线程内保存线程特有的信息。

  • 不必担心其他线程访问

    Thread-Specific Storage是"线程特有的存储"的意思。

    不会被其他线程随意访问 这一特性非常重要。这是因为,在多线程编程中,互斥处理非常重要,但是优雅地执行互斥处理却非常困难。

    Thread-Specific Storage模式为我们提供了一种以线程作为键,让每个线程只能访问它特有的对象的机制。该对象是以线程为单位来保存的,绝对不用担心其他线程会访问该对象。

  • 吞吐量的提高很大程度上取决于实现方式

    Thread-Specific Storage 模式并没有执行互斥处理。因此,这很容易让人误解为与使用 Single Threaded Execution模式相比, 此时的吞吐量会有所提高。但是, 事实并非一定如此。原因如上文所述, 可能 TSObjectCollection角色中执行了隐藏的互斥处理。此外, 每次通过 TSObjectProxy角色调用方法时, 使用 TSObjectCollection角色都会产生额外的性能开销。

    Thread-Specific Storage更看重如下所示的可复用性。

    1. 不改变结构即可实现程序

    2. 没有显式地执行互斥处理, 所以编程时犯错的可能性较小

  • 上下文的危险性

    Thread-Specific Storage模式中, TSObjectCollection角色会自动判断当前的线程。也就是说, 我们没有必要将线程的相关信息通过参数传递给 TSObjectCollection角色。这相当于在程序中引入了上下文。上下文虽然很方便,但是也有一定的危险性。因为开发人员看不到处理中所使用的信息。

基于角色与基于任务

基于角色的方式即在表示线程的实例中保存进行工作所必需的信息(上下文、状态)。这样可以减少和减轻线程之间的交互信息量。一个线程会使用从其他线程接收到的信息来执行处理, 改变自己的内部状态。通常,我们称这样的线程为角色。

基于任务的方式不在线程中保存信息(上下文、状态)。在这种方式下, 这些信息不保存在线程中,而是保存在线程之间交互的实例中。而且, 不仅是数据, 连用于执行请求的方法都定义在其中。像这样在线程之间交互的实例可以称为消息、请求或是命令。这里我们暂且称其为任务。

Active Object(主动对象) 模式

Active Object 的简介

Active 是 "主动的"的意思, 因此 Active Object 就是"主动对象"的意思。所谓"主动的", 一般指 "有自己特有的线程"。因此,举例来说,java 的 java.lang.Thread类的实例就是一种主动对象。

不过,在 Active Object 模式中出场的主动对象可不仅仅 "有自己特有的线程"。它同时还具有可以从外部接收和处理异步消息并根据需要返回处理结果的特征。

Active Object 模式中的主动对象会通过自己特有的线程在合适的时机处理从外部接收到的异步消息。

示例程序

见code-multithread-pattern

Active Object 模式中的登场角色

  • Client(委托者) : Client角色调用 ActiveObject角色的方法来委托处理。它能够调用的只有 ActiveObject角色提供的方法。调用这些方法, (如果 ActivationQueue 角色没有满) 程序控制权会立即返回。

    虽然 Client角色只知道 ActiveObject角色,但它实际调用的是Proxy角色。

    Client角色在获取处理结果时, 会调用 VirtualResult角色的 getResultValue方法。这里使用了 Future模式。

  • ActiveObject(主动对象) : Active Object角色定义了主动对象向Client角色提供的接口(API )。

    ActiveObject角色定义了主动对象向 Client角色提供的接口(API)。

  • Proxy(代理人) : Proxy角色负责将方法调用转换为 MethodRequest角色的对象。转换后的 MethodRequest角色会被传递给 Scheduler角色。

    Proxy角色实现了 ActiveObject角色提供的接口(API)。

    调用 Proxy角色的方法的就是 Client角色。将方法调用转换为 MethodRequest角色, 并传递给 Scheduler 角色的操作都是使用 Client角色的线程进行的。

  • Scheduler: Scheduler角色负责将 Proxy角色传递来的 MethodRequest角色传递给 ActivationQueue角色, 以及从 ActivationQueue角色取出并执行 MethodRequest角色这两项工作。

    Client角色的线程负责将 MethodRequest传递给 ActivationQueue角色。

    而从ActivationQueue角色取出并执行MethodRequest角色这项工作则是使用Scheduler角色自己的线程进行的。在Active Object模式中,只有使用 Client 角色和 Scheduler 角色时才会启动新线程。

    Scheduler角色会把 MethodRequest角色放入 ActivationQueue 角色或者从ActivationQueue角色取出 MethodRequest 角色。

    因此,Scheduler 角色可以判断下次要执行哪个请求。如果想实现请求调度的判断逻辑,可以将它们实现在Scheduler角色中。也正是因为如此,我们才将其命名为Scheduler。

  • MethodRequest

    MethodRequest角色是与来自Client角色的请求对应的角色。MethodRequest定义了负责执行处理的 Servant角色,以及负责设置返回值的Future角色和负责执行请求的方法(everto)

    MethodRequest角色为主动对象的接口(API)赋予了对象的表象形式。

  • ContreteMethodRequest

    ConcreteMethodRequest角色是使MethodRequest角色与具体的方法相对应的角色。Active Object角色中定义的每个方法,会有各个类与之对应,比如MethodAlphaRequest。

    • Servant(仆人)

      Servant角色负责实际地处理请求。

      调用Servant角色的是Scheduler角色的线程。Scheduler角色会从ActivationQueue角色取出一个MethodRequest角色(实际上是ConcreteMethodRequest角色)并执行它。此时,Scheduler 角色调用的就是Servant角色的方法。

      Servant角色实现了ActiveObject角色定义的接口(API )。

      Proxy角色会将请求转换为MethodRequest角色,而Servant角色则会实际地执行该请求。Scheduler角色介于Proxy角色和Servant 角色之间,负责管理按照什么顺序执行请求。

    • ActivationQueue (主动队列)

      ActivationQueue角色是保存MethodRequest角色的类。

      调用putRequest方法的是Client角色的线程,而调用takeRequest方法的是Scheduler角色的线程。这里使用了Producer-Consumer模式。

    • VirtualResult(虚拟结果)

      VirtualResult角色与Future角色、RealResult角色共同构成了Future模式。

      Client角色在获取处理结果时会调用VirtualResult角色(实际上是Future角色)的getResultvalue方法。

    • Future(期货)

      Future角色是 Client角色在获取处理结果时实际调用的角色。当处理结果还没有出来的时候,它会使用Guarded Suspension模式让 Client角色的线程等待结果出来。

    • RealResult(真实结果)

      RealResult角色是表示处理结果的角色。Servant角色会创建一个RealResult角色作为处理结果,然后调用Future角色的setRealResult方法将其设置到Future角色中。

什么场景下 使用 Active Object

  • 到底做了些什么事情

    1. 定义了接口( API): 由Active Object 角色定义API

    2. 接收异步消息:Proxy 角色将方法调用转换为MethodRequest 角色后保存在ActivationQueue角色中

    3. 与Client角色运行于不同的线程:由 Scheduler角色提供线程

    4. 执行处理:由Servant角色单线程执行处理

    5. 返回返回值:Future角色是返回值的提货单

  • 运用模式时需要考虑问题的粒度

    Active Object模式的组成要素众多,是一个非常庞大的模式。因此,在运用该模式时,必须注意问题的粒度。所谓问题的粒度,是指问题的大小,也就是用于解决问题的每个处理到底有多大。

  • 关于并发性

    Proxy 角色即使被多个线程调用也没有问题({ concurrent } )

    Servant角色只能被一个线程调用( { sequential } )

  • 增加方法

  • Scheduler 角色的作用

    在[POSA2]中,Scheduler 角色如下。首先,各ConcreteMethodRequest 角色会定义guard方法。接着,如果可以执行ConcreteMethodRequest 角色,就让guard方法返回true。仅当guard方法的返回值是true时,Scheduler角色才会调用ConcreteMethodRequest角色的execute方法”。这样,ConcreteMethodRequest角色的守护条件就可以整合在它们各自的方法中了。.

  • 主动对象之间的交互

    可以编写多个主动对象,然后让它们之间互相交互。也就是说,Servant角色会调用其他 ActiveObject 角色的方法。

  • 通往分布式-从跨越线程界线变为跨越计算机界线

    这个方法运行于哪个线程呢? 在Active Object模式中,“方法的调用”的部分运行于Client角色的线程中,“方法的执行”部分运行于Scheduler角色的线程中。

    这里其实也是“调用与执行的分离”: 执行invocation的线程(Client角色)与执行execution 的线程(Scheduler角色)被分离开了。

    如果将线程分离开来,那么就可以很容易地将线程运行于的计算机也分离开来,即将执invocation的机器与执行execution的计算机分离开,然后用网络将它们连接起来。那么网络之间互相传输的是什么呢?对,就是 MethodRequest角色和Result角色。

    由于方法的调用和设置返回值都已经被转换为了对象这种“有形的东西”,所以可以通过网络交互。这可以说是“从跨越线程界线变为了跨越计算机界线”。

    http: / / docs.oracle.com/javase/8/docs/technotes/qguides/rmi/

其他有关的设计模式

看到的一些其他的多线程设计模式, 罗列在下面。欢迎小伙伴们反馈噢~

Promise (承诺)模式

Serial Thread Confinement(串行线程封闭) 模式

Master-Slave(主仆) 模式

Pipeline(流水线) 模式

Half-sync/ Half-async(半同步/半异步)模式

参考文献

  1. 参考1: https://blog.csdn.net/weixin_30399797/article/details/95064835

  2. 参考2: https://zhuanlan.zhihu.com/p/543255670

  3. 参考3: http://www.manongjc.com/detail/21-xpewmalhgirntda.html

  4. 参考4: 多线程与设计模式PDF

  5. 参考5: 多线程与设计模式代码仓库地址https://www.ituring.com.cn/book/1812

内容大纲
批注笔记
多线程设计模式
ArticleBot
z
z
z
z
主页
Git管理
文章
云文档
留言墙
AI文档