乐观锁和悲观锁

悲观锁

悲观锁在写操作频繁的场景下表现较好:共享资源被多个线程频繁地修改,悲观锁可以避免大量的重试和回滚操作。

synchronized

  1. 同步实例方法

修饰实例方法,是对象级别的锁,同一时间1个对象只能有1个同步实例方法在执行;但是非同步方法无所谓。

1
public synchronized void decrement() {}
  1. 修饰类的静态方法

修饰静态方法,是类级别的锁,会锁定当前类的Class对象。同一时间只有一个线程可以执行该类的任何1个synchronized静态方法。

1
public static synchronized void incrementStatic() {}
  1. 同步代码块:同步代码块提供了更细粒度的控制,允许你指定一个对象作为锁,只对代码块内的特定代码进行同步。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class BlockCounter {
private int count = 0;
private final Object lock = new Object(); // 用于同步的锁对象
public void performOperation() {
synchronized (lock) { // 锁定 'lock' 对象
count++;
System.out.println(Thread.currentThread().getName() + " incremented count to: " + count);
}
}
public void performAnotherOperation() {
synchronized (this) { // 锁定当前实例对象
count--;
System.out.println(Thread.currentThread().getName() + " decremented count to: " + count);
}
}
}

ReentrantLock

  • 分为公平锁和非公平锁,构造函数传入true则为公平锁。
  • .lock->void.trylock()->boolean两种,前者阻塞,后者不阻塞;确保在finally中释放

乐观锁

乐观锁总是假设最好的情况,认为共享资源每次被访问的时候不会出现问题。

CAS

CompareAndSwap,有3个值:V值(当前内存的值)、E值(期望的V值为多少)、X值(要更新的值)。

会事先读取E值,然后原子性地“比较并交换”;当V值等于先前设置的E值时,才会成功执行。

java中的CAS依赖本地方法硬件层面的实现,即在Unsafe类中调用native方法,例如如下三个函数的签名:

1
2
3
4
// o和offset是用来读取内存中的当前值V,expected是内存中的预期值E,x是要设置的值X
public final native boolean compareAndSwapObject(Object o, long offset, Object expected, Object x);
public final native boolean compareAndSwapInt(Object o, long offset, int expected, int x);
public final native boolean compareAndSwapLong(Object o, long offset, long expected, long x);

AtomicInteger、AtomicBoolean、AtomicLong等原子类的底层就是用了unsafe的CAS操作,它会维护value字段的偏移量valueOffset,传递给上述的offset参数,它通过如下操作获得:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
private volatile int value; // 原子类的值
private static final long valueOffset; // value的偏移量,通过偏移量得到

static {
try {
// AtomicInteger.class.getDeclaredField("value") 通过反射获取类的字段,类型为Field
// unsafe中的方法签名:public native long objectFieldOffset(Field f);
valueOffset = unsafe.objectFieldOffset(AtomicInteger.class.getDeclaredField("value"));
} catch (Exception ex) { throw new Error(ex); }
}

public final int getAndIncrement() {
return unsafe.getAndAddInt(this, valueOffset, 1);
}

unsafe中的“获取并自增”函数中,会写一个循环,先volatile地获取E值,再尝试让他自增;只要在这个间隔里没有别人修改就能成功。

1
2
3
4
5
6
7
public final int getAndAddInt(Object o, long offset, int delta) {
int v;
do {
v = getIntVolatile(o, offset);
} while (!compareAndSwapInt(o, offset, v, v + delta));
return v;
}

然而,Atom类并不能解决ABA问题,需要在变量前面追加上版本号或者时间戳,例如AtomicStampedReference类,他就在内部定义了个Pair(包含值和时间戳stamp),它的cas操作都是针对pair的。但是参数要传的比较多。

1
2
3
4
5
6
public boolean compareAndSet(V expectedReference, V newReference, int expectedStamp, int newStamp) {
Pair<V> current = pair;
return expectedReference == current.reference && expectedStamp == current.stamp &&
((newReference == current.reference && newStamp == current.stamp) ||
casPair(current, Pair.of(newReference, newStamp)));
}

AQS

概述:全称AbstractQueuedSynchronizer,即抽象同步队列。AQS的本质是个锁,封装了共享资源的获取和释放的通用流程。他的实现类用于多种JUC提供的同步器的实现,例如ReentrantLock中的公平锁和非公平锁等等。

分类:支持独占锁(例如ReentrantLock)和共享锁(例如CountDownLatch、Semaphore、CyclicBarrier)两种模式;可以是公平的或非公平的锁。

包含如下数据结构:

  1. volatile修饰的state变量,往往使用CAS操作它,用于标记当前锁的状态,例如是否被占用、被多少个线程占用等等;
  2. 先进先出(FIFO)的双向队列,以及队列节点Node,用于存放等待中的线程;
  3. 父类AbstractOwnableSynchronizerexclusiveOwnerThread属性,即当前拥有锁的线程;

常用方法:

1
acquire()

ReentrantLock

以可重入锁ReentrantLock为例的话,整体流程为:

  1. 如果state为0,则尝试用CAS去修改state变量。如果CAS成功,则设置持有者为自己。
  2. 如果state不为0,则检查是否被自己持有。若是,则重入;否则排队等待,即进入acquire()方法
  3. 线程将自己封装成Node节点,尝试入队到等待队列的尾部,这里入队需要更改tail指针,也是个CAS操作;
  4. 入队后进入一个自旋循环,会不断地做如下的事情:
    1. 检查自己的前驱是否为head节点,若不是,直接挂起,等待被唤醒;
    2. 若前驱是head,则用CAS尝试获取锁,若没获取到(非公平锁可能被抢占),则直接挂起,等待被唤醒;
    3. 如果获取到了,说明拿到了锁,此时设置state为1、持有者为自己,并将自己从队列中删除(成为新的head,设置node的thread属性为null),并唤醒下一个节点;
    4. 被唤醒的节点重新开始自旋循环。

如果是公平锁,被唤醒的线程会拿到锁(不考虑极端情况),新来的线程会先判断等待队列中是否为空;为空的时候才尝试获取锁,否则直接入队。

Semaphore

信号量,用来控制同时访问特定资源的线程数量。

1
2
3
final Semaphore semaphore = new Semaphore(5); // 初始共享资源数量
semaphore.acquire(); // 获取1个许可
semaphore.release(); // 释放1个许可

流程和ReentrantLock完全一致,只是对state变量进行CAS时的条件发生了变化,默认state=共享资源数量,获取许可时state--,释放的时候state++

CountDownLatch

倒计时器,允许 count 个线程阻塞在一个地方,直至所有线程的任务都执行完毕。
它是一次性的,计数器的值只能在构造方法中初始化一次。

1
2
3
final CountDownLatch countDownLatch = new CountDownLatch(300);
// 在其他线程中执行500次countDownLatch.countDown();后,await()才停止阻塞
countDownLatch.await(); //

CyclicBarrier

循环栅栏,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。这在组合学习中可以设置。

JMM内存模型

即Java Memory Module,他是Java多线程中,一个纯逻辑的概念。可以将它与CPU缓存和内存类比,但是两者没有什么直接关系。

  1. 主内存:线程创建的所有实例对象都存放在主内存,线程之间的通信必须通过主内存实现。
  2. 本地内存(工作内存):每个线程拥有的私有内存,对其他线程不可见。每次修改共享变量时,需要从主内存中搞一个副本到本地内存,再修改,之后再将其同步到主内存中。

JMM(Java 内存模型)

happens-before原则

他是JVM做出的保证,为开发者提供了一套同步规范(或者叫规则)。它告诉开发者只要按照这套规则写代码,JVM就能保证符合某个操作结果对另一个操作的可见性,即使编译器和操作系统进行各种优化(包括指令重排序、缓存等等)

常见规则,happens-before其实就是“可见”的另一种说法。

  1. 一个线程内,写在前面的指令结果对后面可见。
  2. 线程A释放锁后,它在持有锁的这段时间的操作,对下一个获得锁的线程B可见。
  3. volatile变量的写,对接下来读取该变量的线程可见。

并发编程原则

  1. 原子性
    所有的操作全部都得到执行,或者全部不执行。可以借助synchronized、各种 Lock 以及各种原子类实现原子性。
  2. 可见性
    当一个线程对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。在 Java 中,可以借助synchronizedvolatile 以及各种 Lock 实现可见性。
  3. 有序性
    由于指令重排序问题,代码的执行顺序未必就是编写代码时候的顺序。重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致。

ThreadLocal

线程私有的局部变量副本,可以理解为一种共享资源,然后复制若干份副本给每个线程。

Thread类有一个类型为ThreadLocal.ThreadLocalMap的实例变量threadLocals,也就是说每个线程有一个自己的ThreadLocalMap

每个线程在往ThreadLocal里放值的时候,都会往自己的ThreadLocalMap里存,读也是以ThreadLocal作为引用,在自己的map里找对应的key,从而实现了线程隔离

在某个线程中设置某个外部的threadLocal的值为value,在这个threadLocal.set(value)函数中,会把this(即当前的threadLocal)和value键值对设置到Thread.currentThread()ThreadLocalMap字段上去(如果不存在会先创建)

![ThreadLocal数据结构](../../../../../Library/Application Support/typora-user-images/image-20250718193342972.png)

弱引用的key

为了避免key的内存泄漏,将ThreadLocalMap中的key设置为弱引用。
这是由于线程池的存在:线程实例会一直存在,线程实例中的ThreadLocalMap并不会被销毁的,所以Map里面的键值对一直会被Map引用的(当然这个键值对“的值”一定会被业务代码强引用)

所以为了避免键值对长期存在,将线程的ThreadLocalMap对threadLocal的引用标记为弱引用。当业务代码不再使用ThreadLocal的时候,通过将其设置为threadLocal=null,使得Map中threadLocal的key就只剩下弱引用了,这样这个threadLocal变量就能被回收了。

为什么value不是弱引用?因为存在资源被放入但没有强引用指向它的情况,导致它被GC回收,但后续可能还要被用呢。这样的强引用value确实会导致内存泄漏问题,因为存在ThreadLocalMap=>Entry=>Value的引用链。所以使用完ThreadLocal后必须调用remove方法,手动断开value的强引用,让GC去回收他。

ThreadLocalMap

一个ThreadLocal的内部类,但是被每个线程所持有,线程使用时会懒初始化。遇到哈希冲突时,会使用线性探测法。如果获取到没有的key,则会先初始化为null。

Executor相关类

Runnable 和 Callable 接口,前者@Override run方法,后者@Override call方法。前者没有返回值,后者有。工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象,即Executors.callable(Runnable task)Executors.callable(Runnable task, Object result)

Executor 和 ExecutorService 接口,前者提供 .execute(Runnable),后者提供.submit(Runnable|Callable)
.execute没有返回结果,无法接收Callable参数;.submit 返回 Future<?>Future<ResultType>

Future<?>.submit 一个 Runnable 的返回,Future<ResultType>.submit 一个Callable的返回(或者手动指定Runnable的返回值);

FutureTask<ResultType> 实现了Runnable,他可以把一个Callable的包装成Runnable的,并仍然支持获取返回值;既可以.execute 也可以 .submit,它可以接收 1. Callable 类型的任务;2. Runnable 类型的任务+返回值

Executor继承链

定时周期任务类

ScheduledThreadPoolExecutor实现类和ScheduledExecutorService接口类

  1. ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit): 在给定延迟后执行一次 Callable 任务。

  2. ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit): 在给定延迟后执行一次 Runnable 任务。

  3. ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit): 在指定初始延迟后,以固定频率(任务开始后固定时间间隔)周期性地执行任务。

  4. ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit): 在指定初始延迟后,以固定延迟(任务结束和下一次任务开始之间的固定时间间隔)周期性地执行任务。

ForkJoinPool

这个类感觉有点复杂,先不看

ListenExecutorService

线程池的参数

ThreadPoolExecutor构造函数_6个

  1. corePoolSize核心线程数。即使线程空闲,这些线程也会一直存在,除非设置了allowCoreThreadTimeOut
  2. maximumPoolSize最大线程数。线程池允许存在的最大线程数量。
  3. keepAliveTime非核心线程空闲时间。当线程数大于corePoolSize时,这些多余的空闲线程在等待新任务时,如果空闲时间超过keepAliveTime,它们就会被终止。
  • unitkeepAliveTime的单位。
  1. workQueue任务队列。用于存放等待执行的任务。常见的有ArrayBlockingQueue (有界队列)、LinkedBlockingQueue (无界队列)、SynchronousQueue (不存储元素的队列) 等。
  2. threadFactory线程工厂。用于创建新线程,本质是个实现了Thread newThread(Runnable r);方法的接口,可以自定义线程的设置,例如优先级、是否为守护进程等等。
  3. handler拒绝策略。当任务队列已满且线程数达到maximumPoolSize时,线程池会采取的拒绝策略。常见的有ThreadPoolExecutor.AbortPolicy (直接抛出异常)、CallerRunsPolicy (由调用者线程执行任务)、DiscardOldestPolicy (丢弃队列中最旧的任务)、DiscardPolicy (直接丢弃任务) 等。

Executors提供的线程池

  1. FixedThreadPool: 线程数量固定,使用阻塞队列LinkedBlockingQueue,是个无界队列(最大长度Integer.MAX_VALUE),可能堆积大量请求,导致OOM。
  2. SingleThreadExecutor: 提供单线程执行器,和FixedThreadPool设置1后的区别在于,SingleThreadExecutor被FinalizableDelegatedExecutorService包装,增加了finalize()方法来调用底层的shutdown()方法,提高了单线程执行器的健壮性。
  3. CachedThreadPool: 可根据实际情况调整线程数量,线程最大数量为 Integer.MAX_VALUE。使用同步队列 SynchronousQueue,该队列容量为零,每次offer(或poll)方法都必须等待另一线程进行poll(或offer),即该线程池内部会创建新线程来获取这个任务
    • 相当于任务越多,线程越多;线程作为一种资源进行“缓存”;对于的线程默认超过60s不用就销毁
  4. ScheduledThreadPool: 和## 定时周期任务类中直接创建“实现类”没有区别。使用DelayedWorkQueue延迟阻塞队列,该队列是个内部实现类,但功能和DelayQueue非常相似。
    • DelayQueue通过实现BlockingQueue接口,并且内部维护一个PriorityQueue,实现了:1. 当任务没有到期时,进行阻塞;2. 识别最先到期的任务(优先级最高),将其取出并执行
1
2
3
// 两者本质没有差异,唯一差异在返回值的类型。但动态类型是一致的
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new ScheduledThreadPoolExecutor(3);
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(3);