Java中线程同步辅助类
CountDownLatch类
CountDownLatch类,是用来线程同步辅助类,可用在某些线程工作前先完成某些动作或这某些线程工作完成后进行收尾工作。
主要的两个方法:
- countDown():使计数器减一
- await():当count为0时,等待的线程会被释放可以进行下一步操作
简单使用:
例如:有三个线程在进行爬取内容,有一个线程进行汇总。
public class CountDownLatchTest {
public static void main(String[] args) {
CountDownLatch cdl = new CountDownLatch(3);
new Thread(new Runable() {
@Override
public void run() {
try {
// 这里调用的是await()不是wait()
cdl.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("汇总所有爬取的内容");
}
}).start();
for(int i = 0; i < 3; i++) {
new Thread(new Runable() {
@Override
public void run() {
System.out.println("正在爬取第"+i+"个页面");
cdl.countDown();
}
}).start();
}
}
}
CyclicBarrier类
CyclicBarrier允许一组线程相互等待,直至到了公共屏障点,这组线程才可以继续执行下去。而且CycliBarrier可以被重用。
public class CyclicBarrierTest {
// 参与者数量
private static int parties = 3;
// 创建一个新的 CyclicBarrier,它将在给定数量的参与者(线程)处于等待状态时启动
private static CyclicBarrier barrier = new CyclicBarrier(parties);
public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < parties; i++)
new Thread(new Task()).start();
Thread.sleep(1000);
System.out.println("getNumberWaiting():" + barrier.getNumberWaiting());
}
static class Task implements Runnable {
public void run() {
try {
System.out.println(Thread.currentThread().getName() + "await");
// 在所有参与者都已经在此 barrier 上调用 await 方法之前,将一直等待。
barrier.await();
System.out.println(Thread.currentThread().getName() + "continued");
} catch (BrokenBarrierException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
Semaphore
Semaphore,其实就是信号量,内部保留一个计数count,用于控制同时访问线程个数。
主要两个方法:
- acquire():获取信号量,得到之后count减一,可以继续执行;否则会阻塞起来
- release():操作完成,需要释放信号量,count加1
举例:
银行有三个窗口,每次最多只能有三个人去办理业务。
public class SemaphoreTest {
public static void main(String[] args) {
int num = 10;
Semaphore semaphore = new Semaphore(3);
for(int i = 0; i < num; i++) {
final int person = i;
new Thread(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("第"+(person+1)+"位正在前台办理业务");
Thread.sleep(1000);
System.out.println("第"+person+"位办理完业务");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
Thread.sleep(10);
}
}
}
关于AQS(AbstractQueuedSynchronizer)
AQS位于java.util.concurrent.locks,是一个用来构建锁和同步器的框架,底层采用模板方法模式,自定义同步器时,只需要覆盖重写AQS中isHeldExclusively(),tryAcquire(int),tryRelease(int),tryAcquireShared(int),tryReleaseShared(int)这几个模板方法即可。
AQS使用一个int成员变量来表示同步状态,通过内置的FIFO队列来完成获取资源线程的排队工作。AQS使用CAS对该同步状态进行原子操作实现对其值的修改。
private volatile int state;//共享变量,使用volatile修饰保证线程可见性
状态信息通过procted类型的getState,setState,compareAndSetState进行操作
//返回同步状态的当前值
protected final int getState() {
return state;
}
// 设置同步状态的值
protected final void setState(int newState) {
state = newState;
}
//原子地(CAS操作)将同步状态值设置为给定值update如果当前同步状态的值等于expect(期望值)
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
以上三个同步辅助类均是继承AQS实现的。
AQS核心思想是,如果被请求的共享资源空闲,则将当前请求资源的线程设置为有效的工作线程,并且将共享资源设置为锁定状态。如果被请求的共享资源被占用,那么就需要一套线程阻塞等待以及被唤醒时锁分配的机制,这个机制AQS是用CLH队列(这是一个虚拟队列,采用节点相连)锁实现的,即将暂时获取不到锁的线程加入到队列中。
2019-03-30:看到一篇讲的很好的文章,简单明了的阐述AQS,传送门