为什么需要线程通信

  • 多个线程并发执行时, 在默认情况下CPU是随机切换线程的,当我们需要多个线程来共同完成一件任务,并且我们希望他们有规律的执行, 那么多线程之间需要一些协调通信,以此来帮我们达到多线程共同操作一份数据。
  • 当然如果我们没有使用线程通信来使用多线程共同操作同一份数据的话,虽然可以实现,但是在很大程度会造成多线程之间对同一共享变量的争夺,那样的话势必为造成很多错误和损失!

什么是线程通信

线程通信的目标是使线程间能够互相发送信号

基于共享内存的通信

volatile

  • 利用关键字volatile进行数据共享。
  • 线程循环判断volatile修饰的变量的值。
  • volatile可以保证数据的可见性,但不保证操作的原子性。
    • ++i和i++就不是原子操作。
  • 更多详见java基础 volatile详解

synchronized+单例+volatile

  • 利用单例对象进行对象共享
  • 在设置值时,synchronized修饰方法,保证方法上的线程安全,从而保证数据的原子性。
  • volatile的属性,保证实时能拿到最新值。
  • 线程循环判断对象的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class Demo{
private static Demo demo = new Demo();
private volatile String name;
private volatile String age;
private Demo(){};
public Demo getInstance(){
return demo;
}
public String getName() {
return name;
}
public synchronized void setName(String name) {
this.name = name;
}
public String getAge() {
return age;
}
public synchronized void setAge(String age) {
this.age = age;
}
}

内部类

  • 在类中创建内部类去继承Thread
  • 暴露创建实例化内部类的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Demo{
volatile int index = 0;

public static void main(String[] args) {
Demo demo = new Demo();
demo.getThread("1").start();
demo.getThread("2").start();
demo.getThread("3").start();
}

private class DemoThread extends Thread{
@Override
public void run() {
while(true){
try {
sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + "is running and index is " + index++);
}
}
public DemoThread(String name) {
super(name);
}
}

public Thread getThread(String name) {
return new DemoThread(name);
}
}

管道

在java的类库中有2个管道类:PipedWriter/PipedInputStream(允许任务向管道写),和PipedReader/PipedOutputStream(允许不同任务从同一管道中读取)。管道也可以理解为一个缓冲区,将要读写的内容存入到管道,输入输出都要从这个管道去操作,管道提供了一个封装好的解决方案。
通知的线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class Sender implements Runnable {
private PipedOutputStream pipedOutputStream;
public Sender(PipedOutputStream pipedOutputStream) {
this.pipedOutputStream = pipedOutputStream;
}
@Override
public void run(){
try {
String str = "ABCDEFGHIJKLMNOPQRSTUVWXYZ";
byte[] bytes = str.getBytes();
byte[] limitBytes = new byte[1024];
int pages = bytes.length/limitBytes.length;
for (int i = 0; i < pages; i++) {
for (int j = 0; j < 1024; j++) {
limitBytes[j] = bytes[i*1024+j];
}
pipedOutputStream.write(limitBytes,0,1024);
}
//最后一段
for (int i = 0; i < bytes.length-pages*1024; i++) {
limitBytes[i] = bytes[pages*1024+i];
}
pipedOutputStream.write(limitBytes,0,bytes.length-pages*1024);
pipedOutputStream.flush();
pipedOutputStream.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}

被通知的线程,在管道未写完之前,获取是堵塞状态,等待管道写入数据。
在管道写入结束后,需关闭管道,不然会抛java.io.IOException: Write end dead

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class Receiver implements Runnable {
private PipedInputStream pipedInputStream;
public Receiver(PipedInputStream pipedInputStream) {
this.pipedInputStream = pipedInputStream;
}
@Override
public void run() {
try {
byte[] bytes = new byte[1024];
int index = -1;
while((index = pipedInputStream.read(bytes))>-1){
System.out.println(new String(bytes,0,index).toString());
}
}catch (Exception e){
e.printStackTrace();
}
}
}

启动类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class Main {
public static void main(String[] args) throws Exception {
new Main().pipe();
}

public void pipe() throws Exception{
PipedOutputStream pipedOutputStream = new PipedOutputStream();
PipedInputStream pipedInputStream = new PipedInputStream();
pipedOutputStream.connect(pipedInputStream);
Sender sender = new Sender(pipedOutputStream);
Receiver receiver = new Receiver(pipedInputStream);
Thread threadS = new Thread(sender);
Thread threadR = new Thread(receiver);
threadR.start();
threadS.start();
}
}

基于线程调度的通信

wait、notify\notifyAll

  • jdk5之前
  • notify随机唤醒,并不能保证唤醒的正确性。
  • notifyAll唤醒全部,但是不能保证调用顺序。
  • wait会释放锁,notify\notifyAll不会释放锁,唤醒后需等当前线程执行完毕后才能再执行被唤醒的线程。

Condition

  • jdk5+
  • 以AQS非静态内部类的方式实现,因此Condition初始化的前提是先要有Lock实例,并且要先获取到锁。
  • Condition.await()同wait
  • Condition.signal()同notify
  • Condition.signalAll()同notifyAll
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class ConditionUseCase {
public Lock lock = new ReentrantLock();
public Condition condition = lock.newCondition();
public static void main(String[] args) {
ConditionUseCase useCase = new ConditionUseCase();
ExecutorService executorService = Executors.newFixedThreadPool (2);
executorService.execute(new Runnable() {
@Override
public void run() {
useCase.conditionWait();
}
});
executorService.execute(new Runnable() {
@Override
public void run() {
useCase.conditionSignal();
}
});
}
public void conditionWait() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "拿到锁了");
System.out.println(Thread.currentThread().getName() + "等待信号");
condition.await();
System.out.println(Thread.currentThread().getName() + "拿到信号");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
public void conditionSignal() {
try {
Thread.sleep(5000);
lock.lock();
System.out.println(Thread.currentThread().getName() + "拿到锁了");
condition.signal();
System.out.println(Thread.currentThread().getName() + "发出信号");
}catch (Exception e){
e.printStackTrace();
}finally {
lock.unlock();
}
}
}

外界原子性数据

  • redis,数据库,zk的同步锁等。