多執行緒環境下控制共享資源的訪問,是每個 Java 工程師必會的技能。Semaphore 控制許可數量,volatile 確保可見性。這兩個概念常常被搞混,今天一次說清楚。

Semaphore 基本概念

Semaphore 就是「信號量」,用來控制同時訪問某個資源的執行緒數量。

實時比喻

停車場有 10 個停車位。每個進來的車要先拿個 ticket(acquire 許可),停好了才能放 ticket(release 許可)。只有拿到 ticket 的車才能進。

1
2
3
Semaphore(10) = 10 個停車位
車進來:semaphore.acquire() // 許可數 -1
車停好:semaphore.release() // 許可數 +1

基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import java.util.concurrent.Semaphore;

// 建立 Semaphore,許可數 = 5
Semaphore semaphore = new Semaphore(5);

public void accessResource() throws InterruptedException {
semaphore.acquire(); // 取得許可(如果沒有就阻塞)
try {
// 使用資源
System.out.println(Thread.currentThread().getName() + " 正在訪問資源");
Thread.sleep(2000); // 模擬處理
} finally {
semaphore.release(); // 釋放許可
}
}

要點:

  • acquire()release() 要配對,通常放在 try-finally
  • 如果沒有許可,acquire() 會阻塞
  • 可以設定初始許可數(Semaphore(5) = 5 個許可)

多執行緒範例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class SemaphoreDemo {
static Semaphore semaphore = new Semaphore(2); // 最多 2 個執行緒同時訪問

public static void main(String[] args) {
for (int i = 1; i <= 5; i++) {
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName() + " 準備訪問");

semaphore.acquire();
long start = System.currentTimeMillis();
System.out.println(Thread.currentThread().getName() + " 開始訪問 (許可數: " + semaphore.availablePermits() + ")");

Thread.sleep(3000); // 模擬工作

long elapsed = System.currentTimeMillis() - start;
System.out.println(Thread.currentThread().getName() + " 結束 (花費 " + elapsed + "ms)");

semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "Thread-" + i).start();
}
}
}

// 輸出示例:
// Thread-1 準備訪問
// Thread-2 準備訪問
// Thread-3 準備訪問
// Thread-4 準備訪問
// Thread-5 準備訪問
// Thread-1 開始訪問 (許可數: 1)
// Thread-2 開始訪問 (許可數: 0)
// Thread-3 準備訪問(阻塞中,等待 Thread-1 或 Thread-2 釋放許可)
// ...

注意:同時最多只有 2 個執行緒在「開始訪問」,其他要排隊。

Semaphore 的應用場景

1. API 限流

不超過 10 個並行請求:

1
2
3
4
5
6
7
8
9
10
11
private static final Semaphore apiLimiter = new Semaphore(10);

public void callExternalAPI(String endpoint) throws InterruptedException {
apiLimiter.acquire();
try {
// 執行 API 呼叫
restTemplate.exchange(endpoint, HttpMethod.GET, null, String.class);
} finally {
apiLimiter.release();
}
}

2. 資料庫連線池

假設連線池只有 20 個連線:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private static final Semaphore connectionPool = new Semaphore(20);

public Connection getConnection() throws InterruptedException {
connectionPool.acquire();
Connection conn = pool.pop();
return new PooledConnection(conn, connectionPool);
}

class PooledConnection implements Connection {
private Connection underlying;
private Semaphore semaphore;

public void close() throws SQLException {
underlying.close();
semaphore.release(); // 歸還許可
}
}

3. 讀寫鎖(高級)

Semaphore 甚至可以實作讀寫鎖的概念。

volatile 關鍵字

volatile 確保共享變數的可見性有序性,但不保證原子性

什麼是可見性問題

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class VisibilityProblem {
private static boolean flag = false;

public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
try {
Thread.sleep(100); // 100ms 後改 flag
} catch (InterruptedException e) {}
flag = true;
System.out.println("Set flag to true");
}).start();

// main 執行緒一直檢查 flag
while (!flag) {
// 如果 flag 不是 volatile,可能會無限迴圈
// 因為 CPU 緩存的 flag 值沒更新
}

System.out.println("Flag is now true");
}
}

沒有 volatile,迴圈可能永遠跑不出來(儘管其他執行緒改了 flag)。

用 volatile 修復

1
private static volatile boolean flag = false;  // 加上 volatile

volatile 的作用:

  1. 每次讀取 - 從共享記憶體讀,不從 thread local cache
  2. 每次寫入 - 直接寫到共享記憶體
  3. 禁止指令重排 - 確保操作順序

volatile 的坑:不保證原子性

1
2
3
4
5
private static volatile int count = 0;

public static void increment() {
count++; // 這不是 atomic!
}

雖然 volatile,但 count++ 其實分成三步:

  1. 讀取 count(假設是 10)
  2. 加 1(變成 11)
  3. 寫回 count

多個執行緒同時做,會丟失更新:

1
2
3
4
// 執行緒 1 和 2 同時讀到 count = 10
// 執行緒 1:讀 10 -> 加 1 -> 寫 11
// 執行緒 2:讀 10 -> 加 1 -> 寫 11
// 本應是 12,結果是 11(丟失了一次更新)

修復方式:用 AtomicInteger

1
2
3
4
5
private static AtomicInteger count = new AtomicInteger(0);

public void increment() {
count.incrementAndGet(); // 原子操作
}

AtomicInteger 保證原子性(使用 CAS - Compare and Swap)。

實務例子

例子 1:執行緒安全的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class ThreadSafeInitialization {
private static volatile Singleton instance = null;
private static Semaphore initLock = new Semaphore(1);

public static Singleton getInstance() throws InterruptedException {
if (instance == null) {
initLock.acquire();
try {
// Double-check locking pattern
if (instance == null) {
instance = new Singleton();
}
} finally {
initLock.release();
}
}
return instance;
}
}

這是經典的 double-check locking(雖然在現代 Java 8+ 有更好的做法)。

例子 2:限制並行執行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public class TaskExecutor {
private final Semaphore semaphore = new Semaphore(3); // 最多 3 個並行

public void executeTask(Runnable task) throws InterruptedException {
semaphore.acquire();
try {
task.run();
} finally {
semaphore.release();
}
}
}

// 使用
TaskExecutor executor = new TaskExecutor();
List<Thread> threads = new ArrayList<>();

for (int i = 0; i < 10; i++) {
int taskId = i;
Thread t = new Thread(() -> {
try {
executor.executeTask(() -> {
System.out.println("Task " + taskId + " started");
Thread.sleep(2000);
System.out.println("Task " + taskId + " finished");
});
} catch (InterruptedException e) {
e.printStackTrace();
}
});
threads.add(t);
t.start();
}

// 最多同時 3 個任務跑,其他排隊

volatile 的替代品

現代 Java 有更好的選擇:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// volatile boolean
private volatile boolean flag = false;

// 更好:用 AtomicBoolean
private AtomicBoolean flag = new AtomicBoolean(false);

// volatile int
private volatile int counter = 0;

// 更好:用 AtomicInteger
private AtomicInteger counter = new AtomicInteger(0);

// volatile reference
private volatile Data data = null;

// 更好:用 AtomicReference
private AtomicReference<Data> data = new AtomicReference<>(null);

Atomic* 系列提供了原子性保證,還有 volatile 的可見性。

Semaphore vs ReentrantLock vs synchronized

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// synchronized - 最簡單,但只有一個許可
synchronized void criticalSection() {
// ...
}

// ReentrantLock - 比 synchronized 靈活,但還是一個許可
Lock lock = new ReentrantLock();
lock.lock();
try {
// ...
} finally {
lock.unlock();
}

// Semaphore - 多個許可
Semaphore sem = new Semaphore(5);
sem.acquire();
try {
// ...
} finally {
sem.release();
}

選擇:

  • 簡單互斥用 synchronized
  • 需要超時、條件變數用 ReentrantLock
  • 需要限制並行數量用 Semaphore

實務建議

  1. Semaphore 用於限流和連線池

    • API 速率限制
    • 資料庫連線池
    • 執行緒池隊列管理
  2. volatile 用於簡單的狀態標誌

    • 執行緒停止旗標
    • 初始化檢查
    • 不要用於計數器
  3. 計數器用 AtomicInteger/AtomicLong

    • 比 volatile int + 手動鎖更安全
    • 比 synchronized 更高效
  4. 複雜的並行邏輯用 concurrency 庫

    • CountDownLatch - 等待多個執行緒完成
    • CyclicBarrier - 同步屏障
    • Phaser - 分階段同步
    • ConcurrentHashMap - 執行緒安全的 Map

大多數時候,用對工具就能避免九成的併發坑。