題圖來自APOD
你好,這裏是codetrend專欄“高併發編程基礎”。
引言
在併發執行任務時,由於資源共享的存在,線程安全成為一個需要考慮的問題。與串行化程序相比,併發執行可以更好地利用CPU計算能力,提高系統的吞吐量。
例如,當B客户提交一個業務請求時,不需要等到A客户處理結束才能開始,這樣可以提升用户體驗。
然而,併發編程也帶來了新的挑戰。無論是互聯網系統還是企業級系統,在追求高性能的同時,穩定性也是至關重要的。開發人員需要掌握高效編程的技巧,以確保程序在安全的前提下能夠高效地共享數據。
共享資源指多個線程同時對同一份資源進行讀寫操作,這就需要保證多個線程訪問到的數據是一致的,即數據同步或資源同步。為了實現安全且高效的共享數據,以下是一些常用的方法和技術:
- 使用鎖(Lock):通過使用鎖機制,只有獲得鎖的線程才能訪問共享資源,其他線程需要等待鎖的釋放。常見的鎖包括synchronized關鍵字、ReentrantLock等。鎖機制可以保證共享資源在同一時間只被一個線程訪問,從而避免數據競爭和不一致的問題。
- 使用同步塊(Synchronized Block):通過在代碼塊前加上synchronized關鍵字,確保同一時間只有一個線程可以執行該代碼塊。這樣可以限制對共享資源的訪問,保證數據的一致性。
- 使用原子操作類(Atomic Classes):Java提供了一系列原子操作類,如AtomicInteger、AtomicLong等,它們可以保證針對共享資源的操作是原子性的,不會被其他線程中斷,從而避免了數據不一致的問題。
- 使用併發集合(Concurrent Collections):Java提供了一些併發安全的集合類,如ConcurrentHashMap、ConcurrentLinkedQueue等,它們在多線程環境下可以安全地進行讀寫操作,避免了手動處理同步和鎖的麻煩。
- 使用線程安全的設計模式:在程序設計階段,可以採用一些線程安全的設計模式,如不可變對象、線程本地存儲(Thread-local Storage)等,來避免共享資源的競爭和衝突。
數據不一致的問題
package engineer.concurrent.battle.abasic;
/**
* 叫號機排隊模擬,通過多線程併發
*/
public class TicketWindow extends Thread {
private final String name;
private final static int MAX = 100;
private static int ticket = 1;
public TicketWindow(String name) {
this.name = name;
}
public void run() {
while (ticket<= MAX) {
System.out.println(name + "櫃枱正在排隊,排隊號碼為:" + ticket);
ticket++;
}
}
public static void main(String[] args) {
new TicketWindow("一號窗口").start();
new TicketWindow("二號窗口").start();
new TicketWindow("三號窗口").start();
new TicketWindow("四號窗口").start();
}
}
可能的輸出結果如下:
三號窗口櫃枱正在排隊,排隊號碼為:1
四號窗口櫃枱正在排隊,排隊號碼為:1
四號窗口櫃枱正在排隊,排隊號碼為:3
三號窗口櫃枱正在排隊,排隊號碼為:2
四號窗口櫃枱正在排隊,排隊號碼為:7
...
四號窗口櫃枱正在排隊,排隊號碼為:101
四號窗口櫃枱正在排隊,排隊號碼為:102
其中 ticket 就是共享資源,多個TicketWindow運行多線程競爭共享資源。可能出現的問題如下。
ticket被重複使用,也就是一個號被多個窗口叫到。ticket超過最大限制,也就是實際沒得這個號但是卻叫號了。ticket沒有被使用,也就是一張號沒有被叫到。
下面的實例代碼是叫號機排隊模擬,通過多線程併發,使用synchronized解決資源共享問題
package engineer.concurrent.battle.esafe;
import java.util.concurrent.TimeUnit;
/**
* 叫號機排隊模擬,通過多線程併發,使用synchronized解決資源共享問題
*/
public class TicketWindowSynchronized implements Runnable {
private final static int MAX = 100;
private static Integer ticket = 1;
private static final Object lockObj = new Object();
public void run() {
while (ticket <= MAX) {
synchronized (lockObj) {
if (ticket <= MAX) { // 額外的判斷
System.out.println(Thread.currentThread() + "櫃枱正在排隊,排隊號碼為:" + ticket);
ticket++;
}
}
try {
TimeUnit.MILLISECONDS.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
public static void main(String[] args) {
TicketWindowSynchronized ticketTask = new TicketWindowSynchronized();
new Thread(ticketTask, "一號窗口").start();
new Thread(ticketTask, "二號窗口").start();
new Thread(ticketTask, "三號窗口").start();
new Thread(ticketTask, "四號窗口").start();
}
}
使用同步塊(Synchronized Block)
在Java中,同步塊(Synchronized Block)是一種用於實現線程同步的機制。它用於標記一段代碼,確保在同一時間只有一個線程可以執行這段代碼,以避免數據競爭和併發問題。synchronized 字段可以用於對象方法、代碼塊中。
- 同步實例方法:
public synchronized void synchronizedMethod() {
// 執行需要同步的代碼
}
在實例方法上使用synchronized關鍵字,將整個方法體標記為同步塊。當一個線程進入同步方法時,它將獲取該實例對象的鎖,並且其他線程將被阻塞等待鎖的釋放。
- 同步靜態方法:
public static synchronized void synchronizedStaticMethod() {
// 執行需要同步的代碼
}
在靜態方法上使用synchronized關鍵字,將整個靜態方法標記為同步塊。與同步實例方法類似,當一個線程進入同步靜態方法時,它將獲取類對象的鎖,並且其他線程將被阻塞等待鎖的釋放。
- 同步塊:
synchronized (lockObj) {
// 執行需要同步的代碼
}
使用synchronized關鍵字結合一個對象來創建同步塊。當一個線程進入同步塊時,它將獲取該對象的鎖,並且其他線程將被阻塞等待鎖的釋放。在同步塊內,只有一個線程可以執行被同步的代碼。
- 同步塊中的條件等待和喚醒:
synchronized (lockObj) {
while (!conditionMet) {
try {
lockObj.wait(); // 條件不滿足時,線程進入等待狀態並釋放鎖
} catch (InterruptedException e) {
// 處理中斷異常
}
}
// 執行需要同步的代碼
}
synchronized (lockObj) {
conditionMet = true; // 修改條件
lockObj.notify(); // 喚醒一個等待的線程
lockObj.notifyAll(); // 喚醒所有等待的線程
}
在同步塊中,使用對象的wait()方法讓線程進入等待狀態並釋放鎖。當某個條件滿足時,可以使用notify()或notifyAll()方法喚醒等待的線程。注意,在使用條件等待和喚醒時,需要確保線程在同一對象上等待和喚醒。
同步塊提供了一種簡單的方式來實現線程同步,通過獲取對象的鎖來保證同一時間只有一個線程可以執行同步塊內的代碼。這對於控制併發訪問共享資源非常有用。但是需要注意,如果多個線程競爭相同的鎖,可能會導致性能問題和死鎖情況的發生。因此,在使用同步塊時,需要仔細考慮鎖的粒度和設計。
測試代碼如下:
package engineer.concurrent.battle.esafe;
public class SynchronizedCounter {
private int c = 0;
public synchronized void increment() {
System.out.println(Thread.currentThread());;
c++;
}
public synchronized void decrement() {
c--;
}
public synchronized int value() {
return c;
}
}
package engineer.concurrent.battle.esafe;
import java.util.concurrent.CountDownLatch;
public class SynchronizedCounterTest {
public static void main(String[] args) throws InterruptedException {
SynchronizedCounter counter = new SynchronizedCounter();
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
counter.increment();
}
countDownLatch.countDown();
},"線程編號"+i).start();
}
countDownLatch.await();
System.out.println(counter.value());
}
}
輸出結果如下:
100000
使用鎖(Lock)
在Java中,鎖(Lock)是一種用於實現線程同步的機制。它可以確保在同一時間只有一個線程可以訪問共享資源,以避免數據競爭和併發問題。與傳統的synchronized關鍵字相比,Lock提供了更大的靈活性和功能。使用鎖(Lock)機制可以更細粒度地控制線程同步,並且提供了更多高級功能,例如可中斷的鎖獲取、定時鎖獲取和條件變量等待。這使得鎖成為Java中多線程編程的重要組件之一。
- 創建Lock對象:
Lock lock = new ReentrantLock();
- 獲取鎖:
lock.lock(); // 如果鎖可用,獲取鎖;否則等待鎖的釋放
或者帶有超時設置的獲取鎖:
boolean acquired = lock.tryLock(5, TimeUnit.SECONDS); // 嘗試在指定時間內獲取鎖,返回是否成功獲取鎖
if (acquired) {
try {
// 執行需要同步的代碼
} finally {
lock.unlock(); // 釋放鎖
}
} else {
// 獲取鎖失敗的處理邏輯
}
- 釋放鎖:
lock.unlock(); // 釋放鎖
- 使用鎖進行同步:
lock.lock();
try {
// 執行需要同步的代碼
} finally {
lock.unlock();
}
- 使用鎖的Condition進行條件等待和喚醒:
Condition condition = lock.newCondition();
// 等待條件滿足
lock.lock();
try {
while (!conditionMet) {
condition.await(); // 等待條件滿足並釋放鎖
}
// 執行需要同步的代碼
} catch (InterruptedException e) {
// 處理中斷異常
} finally {
lock.unlock();
}
// 喚醒等待的線程
lock.lock();
try {
condition.signal(); // 喚醒一個等待的線程
condition.signalAll(); // 喚醒所有等待的線程
} finally {
lock.unlock();
}
測試代碼如下:
package engineer.concurrent.battle.esafe;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class SynchronizedCounter3 {
private int c = 0;
Lock lock = new ReentrantLock();
public void increment() {
lock.lock();
System.out.println(Thread.currentThread());
c++;
lock.unlock();
}
public void decrement() {
lock.lock();
c--;
lock.unlock();
}
public int value() {
return c;
}
}
package engineer.concurrent.battle.esafe;
import java.util.concurrent.CountDownLatch;
public class SynchronizedCounter3Test {
public static void main(String[] args) throws InterruptedException {
SynchronizedCounter3 counter = new SynchronizedCounter3();
CountDownLatch countDownLatch = new CountDownLatch(10000);
for (int i = 0; i < 10000; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
counter.increment();
}
countDownLatch.countDown();
},"線程編號"+i).start();
}
countDownLatch.await();
System.out.println(counter.value());
}
}
輸出結果如下,在併發情況下輸出一致:
100000
使用原子操作類(Atomic Classes)
在Java中,原子操作類(Atomic Classes)是一組線程安全的工具類,用於進行原子性操作。它們提供了一些原子操作,可以確保在多線程環境下對共享變量的操作是原子的,不會出現數據競爭和併發問題。原子操作類提供了一些常見的原子操作方法,可以確保對共享變量的操作是原子的。它們適用於高併發場景,並且性能較好。使用原子操作類可以避免使用鎖帶來的開銷,並且能夠簡化線程同步的代碼邏輯。
需要注意的是,雖然原子操作類可以保證單個操作的原子性,但不能保證多個操作的原子性。如果需要進行復合操作,例如讀取-修改-寫入操作,仍然需要使用鎖或其他同步機制來保證原子性。另外,原子操作類在某些情況下可能會存在ABA問題,需要根據具體場景選擇合適的解決方案。
- AtomicBoolean:
AtomicBoolean atomicBoolean = new AtomicBoolean();
boolean currentValue = atomicBoolean.get(); // 獲取當前值
atomicBoolean.set(true); // 設置新值
boolean oldValue = atomicBoolean.getAndSet(false); // 先獲取當前值,再設置新值,並返回舊值
- AtomicInteger:
AtomicInteger atomicInteger = new AtomicInteger();
int currentValue = atomicInteger.get(); // 獲取當前值
atomicInteger.set(10); // 設置新值
int oldValue = atomicInteger.getAndSet(5); // 先獲取當前值,再設置新值,並返回舊值
int newValue = atomicInteger.incrementAndGet(); // 原子地增加1,並返回新值
int updatedValue = atomicInteger.updateAndGet(x -> x * 2); // 使用lambda表達式更新值,並返回更新後的值
- AtomicLong:
AtomicLong atomicLong = new AtomicLong();
long currentValue = atomicLong.get(); // 獲取當前值
atomicLong.set(100L); // 設置新值
long oldValue = atomicLong.getAndSet(50L); // 先獲取當前值,再設置新值,並返回舊值
long newValue = atomicLong.incrementAndGet(); // 原子地增加1,並返回新值
long updatedValue = atomicLong.updateAndGet(x -> x * 2); // 使用lambda表達式更新值,並返回更新後的值
- AtomicReference:
AtomicReference<String> atomicReference = new AtomicReference<>();
String currentValue = atomicReference.get(); // 獲取當前值
atomicReference.set("Hello"); // 設置新值
String oldValue = atomicReference.getAndSet("World"); // 先獲取當前值,再設置新值,並返回舊值
boolean updated = atomicReference.compareAndSet("World", "Java"); // 原子地比較和設置值,返回是否成功更新
測試代碼如下:
package engineer.concurrent.battle.esafe;
import java.util.concurrent.atomic.AtomicInteger;
public class SynchronizedCounter2 {
private AtomicInteger c = new AtomicInteger(0);
public synchronized void increment() {
System.out.println(Thread.currentThread());
c.incrementAndGet();
}
public void decrement() {
c.decrementAndGet();
}
public int value() {
return c.get();
}
}
package engineer.concurrent.battle.esafe;
import java.util.concurrent.CountDownLatch;
public class SynchronizedCounter2Test {
public static void main(String[] args) throws InterruptedException {
SynchronizedCounter2 counter = new SynchronizedCounter2();
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
counter.increment();
}
countDownLatch.countDown();
},"線程編號"+i).start();
}
countDownLatch.await();
System.out.println(counter.value());
}
}
輸出結果如下,在併發情況下輸出一致:
100000
使用併發集合(Concurrent Collections)
在Java中,有一組併發集合(Concurrent Collections)可以用於在多線程環境下安全地操作共享數據。這些集合類提供了線程安全的操作,並且能夠處理高併發的情況,常用於多線程編程和併發控制。併發集合提供了一些常見的數據結構和操作方法,能夠在多線程環境下安全地進行讀寫操作。它們採用了特定的併發控制策略,以提供高效的線程安全性能。需要根據具體的場景選擇合適的併發集合類,以滿足線程安全和併發控制的需求。
需要注意的是,併發集合並不適用於所有情況。在某些場景下,例如需要保持原子性操作或依賴複合操作的情況下,可能需要使用其他的同步機制來確保線程安全性。此外,雖然併發集合可以提供更好的性能和擴展性,但在某些情況下可能會佔用更多的內存,需要根據具體情況進行權衡和選擇。
- ConcurrentHashMap:
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key1", 1); // 插入鍵值對
int value = map.get("key1"); // 獲取指定鍵的值
boolean containsKey = map.containsKey("key2"); // 檢查是否包含指定的鍵
Integer oldValue = map.putIfAbsent("key1", 2); // 當鍵不存在時才插入新值
map.remove("key1"); // 移除指定鍵的鍵值對
- ConcurrentLinkedQueue:
ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue<>();
queue.add("item1"); // 添加元素到隊列
String head = queue.peek(); // 獲取隊列頭部元素
String removedItem = queue.poll(); // 移除並返回隊列頭部元素
- CopyOnWriteArrayList:
CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<>();
list.add("item1"); // 添加元素到列表
String item = list.get(0); // 獲取指定索引處的元素
list.set(0, "newItem"); // 替換指定索引處的元素
boolean removed = list.remove("item1"); // 移除指定元素
- ConcurrentSkipListMap:
ConcurrentSkipListMap<Integer, String> map = new ConcurrentSkipListMap<>();
map.put(1, "value1"); // 插入鍵值對
Integer key = map.firstKey(); // 獲取第一個鍵
String value = map.get(key); // 根據鍵獲取值
map.remove(key); // 移除指定鍵的鍵值對
測試代碼如下:
package engineer.concurrent.battle.esafe;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class SynchronizedCounter5 {
private ConcurrentMap<String, Integer> counter = new ConcurrentHashMap<>();
private final String key = "threadName";
public void increment() {
counter.compute(key, (key, value) -> (value == null) ? 1 : value + 1);
}
public void decrement() {
counter.compute(key, (key, value) -> (value != null && value > 0) ? value - 1 : 0);
}
public int value() {
return counter.values().stream().mapToInt(Integer::intValue).sum();
}
}
package engineer.concurrent.battle.esafe;
import java.util.concurrent.CountDownLatch;
public class SynchronizedCounterTest5 {
public static void main(String[] args) throws InterruptedException {
SynchronizedCounter5 counter = new SynchronizedCounter5();
CountDownLatch countDownLatch = new CountDownLatch(1000);
for (int i = 0; i < 1000; i++) {
new Thread(() -> {
for (int j = 0; j < 100; j++) {
counter.increment();
}
countDownLatch.countDown();
},"線程編號"+i).start();
}
countDownLatch.await();
System.out.println(counter.value());
}
}
輸出結果如下,在併發情況下輸出一致:
100000
死鎖原因和分析
死鎖的產生
因為線程中鎖的加入和線程同步到需求存在,資源的競爭問題解決了,但問題出現在解決辦法(也就是鎖)的不合理使用會導致死鎖的出現。死鎖是多線程編程中常見的問題,指兩個或多個線程因為互相持有對方需要的鎖而陷入了無限等待的狀態。Java中的死鎖通常發生在如下情況下:
- 競爭有限資源:多個線程同時競爭一些有限的資源,例如數據庫連接、文件句柄等。
- 鎖嵌套:一個線程持有一個鎖,嘗試獲取另一個鎖,而另一個線程持有第二個鎖並嘗試獲取第一個鎖。
下面是一個造成死鎖的示例代碼:
/**
* 在示例代碼中,兩個線程分別持有 lock1 和 lock2,並嘗試獲取對方持有的鎖。如果這兩個線程同時運行,就會發生死鎖,因為它們互相持有了對方需要的鎖。
*/
public class DeadlockExample {
static Object lock1 = new Object();
static Object lock2 = new Object();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
synchronized (lock1) { // 獲取 lock1
System.out.println("Thread 1: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
synchronized (lock2) { // 嘗試獲取 lock2
System.out.println("Thread 1: Holding lock 1 & 2...");
}
}
},"線程001");
Thread t2 = new Thread(() -> {
synchronized (lock2) { // 獲取 lock2
System.out.println("Thread 2: Holding lock 2...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
synchronized (lock1) { // 嘗試獲取 lock1
System.out.println("Thread 2: Holding lock 1 & 2...");
}
}
},"線程002");
t1.start();
t2.start();
}
}
死鎖的分析
通過jdk提供的開箱即用工具可以快速定位問題。以下是DeadlockExample產生死鎖的定位過程。
- 使用
jps查看當前進程的pid。
jps
20580 Launcher
44712 RemoteMavenServer36
12428 Jps
14556
25052 DeadlockExample
- 使用
jstack pid命令分析堆棧信息。輸出結果如下。
Found one Java-level deadlock:
=============================
"線程001":
waiting to lock monitor 0x0000019d2d1eb820 (object 0x0000000713cfa108, a java.lang.Object),
which is held by "線程002"
"線程002":
waiting to lock monitor 0x0000019d2d1eb740 (object 0x0000000713cfa0f8, a java.lang.Object),
which is held by "線程001"
Java stack information for the threads listed above:
===================================================
"線程001":
at engineer.concurrent.battle.esafe.DeadlockExample.lambda$main$0(DeadlockExample.java:20)
- waiting to lock <0x0000000713cfa108> (a java.lang.Object)
- locked <0x0000000713cfa0f8> (a java.lang.Object)
at engineer.concurrent.battle.esafe.DeadlockExample$$Lambda$14/0x0000000800c01200.run(Unknown Source)
at java.lang.Thread.run(java.base@17.0.7/Thread.java:833)
"線程002":
at engineer.concurrent.battle.esafe.DeadlockExample.lambda$main$1(DeadlockExample.java:34)
- waiting to lock <0x0000000713cfa0f8> (a java.lang.Object)
- locked <0x0000000713cfa108> (a java.lang.Object)
at engineer.concurrent.battle.esafe.DeadlockExample$$Lambda$15/0x0000000800c01418.run(Unknown Source)
at java.lang.Thread.run(java.base@17.0.7/Thread.java:833)
Found 1 deadlock.
上面的jstack信息清晰的給出了死鎖的代碼位置和線程名稱。通過這兩個信息可以定位到代碼塊進行對應問題的修復。
死鎖的避免
要避免死鎖,可以採取以下策略:
- 避免鎖嵌套:儘量減少鎖嵌套的層數,以避免死鎖的發生。
- 按固定順序獲取鎖:多個線程按照固定的順序獲取鎖,以避免交叉競爭造成的死鎖。
- 使用 tryLock() 方法:tryLock() 方法可以嘗試獲取鎖一段時間,如果失敗則放棄獲取鎖,避免一直等待造成的死鎖。
- 使用 LockInterruptibly() 方法:LockInterruptibly() 方法可以在等待鎖的過程中響應中斷信號,避免無限等待造成的死鎖。
- 合理設計資源分配:合理地劃分和分配資源,以避免資源爭用和死鎖的產生。
修改後的代碼如下:
package engineer.concurrent.battle.esafe;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* 使用 tryLock() 方法:tryLock() 方法可以嘗試獲取鎖一段時間,如果失敗則放棄獲取鎖,避免一直等待造成的死鎖。
*/
public class DeadlockExampleFix {
static Lock lock1 = new ReentrantLock();
static Lock lock2 = new ReentrantLock();
public static void main(String[] args) {
Thread t1 = new Thread(() -> {
if(lock1.tryLock()) { // 獲取 lock1
System.out.println("Thread 1: Holding lock 1...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
if(lock2.tryLock()) { // 嘗試獲取 lock2
System.out.println("Thread 1: Holding lock 1 & 2...");
}
}
},"線程001");
Thread t2 = new Thread(() -> {
if(lock2.tryLock()) { // 獲取 lock2
System.out.println("Thread 2: Holding lock 2...");
try {
Thread.sleep(10);
} catch (InterruptedException e) {}
if(lock1.tryLock()) { // 嘗試獲取 lock1
System.out.println("Thread 2: Holding lock 1 & 2...");
}
}
},"線程002");
t1.start();
t2.start();
}
}
輸出結果如下:
Thread 2: Holding lock 2...
Thread 1: Holding lock 1...
參考
- 《Java高併發編程詳解:多線程與架構設計》
關於作者
來自一線全棧程序員nine的探索與實踐,持續迭代中。歡迎關注公眾號“雨林尋北”或添加個人衞星codetrend(備註技術)。