本文基於 Java 11+實現
構建可靠的分佈式系統時,一致性問題是核心挑戰之一。ZooKeeper 的 ZAB 協議和 Paxos 算法作為兩種主流解決方案,在理論基礎和工程實現上各有特點。本文深入分析它們的實現機制、性能特性和最佳實踐。
一、基本概念
ZAB 協議
ZAB (ZooKeeper Atomic Broadcast) 是專為 ZooKeeper 設計的分佈式一致性協議,核心目標是保證分佈式系統中數據更新的原子性和順序一致性。
Paxos 算法
Paxos 是 Leslie Lamport 提出的通用分佈式一致性算法,是眾多分佈式系統的理論基礎,解決的是在不可靠網絡中如何達成共識的問題。
二、ZAB 協議實現
ZAB 協議工作在兩種模式下:
- 恢復模式:系統啓動或 Leader 崩潰時觸發
- 廣播模式:正常運行時處理寫請求
核心接口定義
public interface ZabProcessor {
// 恢復模式接口
boolean startRecovery() throws RecoveryException;
// 廣播模式接口
CompletableFuture<Boolean> processWrite(Request request);
CompletableFuture<Result> processRead(String key, ConsistencyLevel level);
// 狀態查詢接口
boolean isLeader();
long getCurrentEpoch();
}
public interface NetworkClient {
// 基礎網絡通信接口
void connect(String serverId, String address, int port) throws IOException;
void disconnect(String serverId);
// ZAB協議消息
ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException;
void sendCommit(String serverId, CommitPacket commit) throws IOException;
LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt) throws IOException;
boolean sendTruncate(String serverId, TruncatePacket truncPkt) throws IOException;
boolean sendTransactions(String serverId, List<Transaction> txns) throws IOException;
boolean sendNewLeader(String serverId, NewLeaderPacket newLeaderPkt) throws IOException;
void sendHeartbeat(String serverId, long zxid) throws IOException;
void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException;
}
public interface StateMachine {
void apply(long zxid, byte[] command) throws Exception;
long getLastAppliedZxid();
byte[] takeSnapshot() throws Exception;
void restoreSnapshot(byte[] snapshot, long zxid) throws Exception;
}
ZAB 恢復模式實現
public class ZABRecovery {
private final AtomicLong zxid = new AtomicLong(0);
private final AtomicInteger epoch = new AtomicInteger(0);
private volatile ServerState state = ServerState.LOOKING;
private final Logger logger = LoggerFactory.getLogger(ZABRecovery.class);
private final ConcurrentMap<String, ServerData> serverDataMap;
private final int quorumSize;
private final NetworkClient networkClient;
private final StateMachine stateMachine;
private final String serverId;
// 構造函數
public ZABRecovery(String serverId, int quorumSize, NetworkClient networkClient,
StateMachine stateMachine) {
this.serverId = serverId;
this.quorumSize = quorumSize;
this.networkClient = networkClient;
this.stateMachine = stateMachine;
this.serverDataMap = new ConcurrentHashMap<>();
}
// Leader恢復流程
public boolean startRecovery() throws RecoveryException {
MDC.put("component", "zab-recovery");
MDC.put("serverId", serverId);
try {
// 1. 更新選舉輪次
int newEpoch = epoch.incrementAndGet();
logger.info("Starting recovery with epoch: {}", newEpoch);
// 2. 發現階段:收集所有Follower狀態
Map<Long, Set<String>> commitMap = discoverFollowerStates();
// 3. 確定截斷點和提交點
long truncateZxid = determineMaxCommittedZxid(commitMap);
logger.info("Determined truncate zxid: {}", Long.toHexString(truncateZxid));
// 4. 解決可能的衝突(腦裂後)
resolveConflictsAfterPartition(truncateZxid, commitMap);
// 5. 同步階段:將歷史事務同步給Follower
syncFollowers(truncateZxid);
// 6. 切換到廣播模式
state = ServerState.LEADING;
logger.info("Recovery completed, switching to broadcast mode");
return true;
} catch (IOException e) {
logger.error("Recovery failed due to I/O error", e);
state = ServerState.LOOKING;
throw new RecoveryException("I/O error during recovery", e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Recovery interrupted", e);
state = ServerState.LOOKING;
throw new RecoveryException("Recovery process interrupted", e);
} catch (Exception e) {
logger.error("Unexpected error during recovery", e);
state = ServerState.LOOKING;
throw new RecoveryException("Unexpected error during recovery", e);
} finally {
MDC.remove("component");
MDC.remove("serverId");
}
}
// 發現階段:收集所有Follower的最新事務信息
private Map<Long, Set<String>> discoverFollowerStates() throws IOException, InterruptedException {
Map<Long, Set<String>> acceptedZxids = new ConcurrentHashMap<>();
CountDownLatch latch = new CountDownLatch(serverDataMap.size());
List<CompletableFuture<?>> futures = new ArrayList<>();
// 向所有Follower發送CEPOCH消息
for (var entry : serverDataMap.entrySet()) {
final String targetServerId = entry.getKey();
final ServerData serverData = entry.getValue();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
MDC.put("targetServerId", targetServerId);
try {
// 發送新的epoch
EpochPacket epochPkt = new EpochPacket(epoch.get());
LastZxidResponse response = networkClient.sendEpochRequest(
targetServerId, epochPkt);
// 記錄該服務器的最新zxid
synchronized (acceptedZxids) {
acceptedZxids.computeIfAbsent(response.getLastZxid(), k -> new HashSet<>())
.add(targetServerId);
}
logger.info("Server {} last zxid: {}", targetServerId,
Long.toHexString(response.getLastZxid()));
} catch (IOException e) {
logger.error("Failed to discover state from server: {}", targetServerId, e);
} finally {
MDC.remove("targetServerId");
latch.countDown();
}
});
futures.add(future);
}
// 等待大多數響應或超時
if (!latch.await(10, TimeUnit.SECONDS)) {
logger.warn("Discovery phase timed out, proceeding with available responses");
}
// 取消未完成的任務
for (CompletableFuture<?> future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
return acceptedZxids;
}
// 確定需要保留的最大已提交事務ID
private long determineMaxCommittedZxid(Map<Long, Set<String>> commitMap) {
// 尋找被多數派確認的最大ZXID
long maxZxid = 0;
int quorum = getQuorum();
for (var entry : commitMap.entrySet()) {
if (entry.getValue().size() >= quorum && entry.getKey() > maxZxid) {
maxZxid = entry.getKey();
}
}
return maxZxid;
}
// 解決網絡分區後可能的數據衝突
private void resolveConflictsAfterPartition(long truncateZxid,
Map<Long, Set<String>> commitMap) {
logger.info("Checking for potential conflicts after network partition");
// 1. 識別潛在衝突事務 - 那些不在多數派中的更高zxid
List<ConflictingTransaction> conflicts = new ArrayList<>();
for (var entry : commitMap.entrySet()) {
long txnZxid = entry.getKey();
Set<String> servers = entry.getValue();
// 如果zxid大於已確定的截斷點,但不是多數派確認的
if (txnZxid > truncateZxid && servers.size() < getQuorum()) {
// 獲取事務的epoch
int txnEpoch = ZxidUtils.getEpochFromZxid(txnZxid);
int truncateEpoch = ZxidUtils.getEpochFromZxid(truncateZxid);
conflicts.add(new ConflictingTransaction(txnZxid, truncateZxid,
txnEpoch, truncateEpoch,
servers));
}
}
// 2. 處理衝突
if (!conflicts.isEmpty()) {
logger.warn("Found {} potential conflicting transactions after partition",
conflicts.size());
for (ConflictingTransaction conflict : conflicts) {
if (conflict.isFromHigherEpoch()) {
logger.warn("Conflict: transaction with zxid {} from higher epoch {} " +
"found but not in majority. Will be discarded.",
Long.toHexString(conflict.getConflictZxid()),
conflict.getConflictEpoch());
} else {
logger.warn("Conflict: transaction with zxid {} from same epoch {} " +
"found but not in majority. Will be discarded.",
Long.toHexString(conflict.getConflictZxid()),
conflict.getConflictEpoch());
}
// 通知這些服務器截斷這些事務
notifyServersToTruncate(conflict.getServers(), truncateZxid);
}
} else {
logger.info("No conflicting transactions found");
}
}
// 通知服務器截斷超出安全點的事務
private void notifyServersToTruncate(Set<String> servers, long truncateZxid) {
for (String serverId : servers) {
CompletableFuture.runAsync(() -> {
try {
TruncatePacket truncPkt = new TruncatePacket(truncateZxid);
boolean success = networkClient.sendTruncate(serverId, truncPkt);
if (success) {
logger.info("Successfully notified server {} to truncate to zxid {}",
serverId, Long.toHexString(truncateZxid));
} else {
logger.warn("Failed to notify server {} to truncate", serverId);
}
} catch (IOException e) {
logger.error("Error notifying server {} to truncate", serverId, e);
}
});
}
}
// 同步階段:將歷史事務同步給Follower
private void syncFollowers(long truncateZxid) throws IOException, InterruptedException {
// 獲取從truncateZxid開始的所有事務
List<Transaction> txns = loadTransactionsFromLog(truncateZxid);
logger.info("Syncing {} transactions to followers", txns.size());
// 並行同步給所有Follower
CountDownLatch syncLatch = new CountDownLatch(serverDataMap.size());
AtomicInteger successCount = new AtomicInteger(0);
List<CompletableFuture<?>> futures = new ArrayList<>();
for (var entry : serverDataMap.entrySet()) {
final String targetServerId = entry.getKey();
final ServerData serverData = entry.getValue();
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
MDC.put("targetServerId", targetServerId);
try {
// 檢查Follower是否需要使用快照追趕
long followerZxid = serverData.getLastZxid();
if (truncateZxid - followerZxid > SNAPSHOT_THRESHOLD) {
syncFollowerWithSnapshot(targetServerId, followerZxid);
} else {
// 1. 發送TRUNC命令,通知Follower截斷日誌
TruncatePacket truncPkt = new TruncatePacket(truncateZxid);
if (networkClient.sendTruncate(targetServerId, truncPkt)) {
// 2. 發送DIFF命令,同步缺失的事務
if (networkClient.sendTransactions(targetServerId, txns)) {
// 3. 發送NEWLEADER命令,確認同步完成
NewLeaderPacket newLeaderPkt = new NewLeaderPacket(epoch.get());
if (networkClient.sendNewLeader(targetServerId, newLeaderPkt)) {
// 同步成功
successCount.incrementAndGet();
logger.info("Successfully synced server: {}", targetServerId);
}
}
}
}
} catch (IOException e) {
logger.error("Failed to sync server {} with {} transactions, last zxid: {}",
targetServerId, txns.size(), Long.toHexString(truncateZxid), e);
} finally {
MDC.remove("targetServerId");
syncLatch.countDown();
}
});
futures.add(future);
}
// 等待同步完成或超時
if (!syncLatch.await(30, TimeUnit.SECONDS)) {
logger.warn("Sync phase timed out");
}
// 取消未完成的任務
for (CompletableFuture<?> future : futures) {
if (!future.isDone()) {
future.cancel(true);
}
}
// 檢查是否有足夠的服務器同步成功
if (successCount.get() < quorumSize) {
throw new QuorumNotFoundException("Failed to sync with quorum of followers",
successCount.get(), quorumSize);
}
}
// 使用快照同步落後太多的Follower
private void syncFollowerWithSnapshot(String followerId, long followerZxid) throws IOException {
try {
logger.info("Follower {} is too far behind (zxid: {}), syncing with snapshot",
followerId, Long.toHexString(followerZxid));
// 1. 獲取當前狀態快照
byte[] snapshot = stateMachine.takeSnapshot();
// 2. 發送快照給Follower
networkClient.sendSnapshot(followerId, snapshot, zxid.get());
logger.info("Successfully sent snapshot to follower: {}", followerId);
} catch (Exception e) {
logger.error("Failed to sync follower {} with snapshot", followerId, e);
throw new IOException("Snapshot sync failed", e);
}
}
// 從事務日誌加載事務
private List<Transaction> loadTransactionsFromLog(long fromZxid) throws IOException {
List<Transaction> result = new ArrayList<>();
// 實際實現會從持久化存儲讀取事務記錄
logger.info("Loading transactions starting from zxid: {}", Long.toHexString(fromZxid));
return result;
}
private int getQuorum() {
return quorumSize / 2 + 1;
}
// 常量定義
private static final long SNAPSHOT_THRESHOLD = 100000; // 事務差距超過10萬時使用快照
// 衝突事務數據結構
static class ConflictingTransaction {
private final long conflictZxid;
private final long truncateZxid;
private final int conflictEpoch;
private final int truncateEpoch;
private final Set<String> servers;
public ConflictingTransaction(long conflictZxid, long truncateZxid,
int conflictEpoch, int truncateEpoch,
Set<String> servers) {
this.conflictZxid = conflictZxid;
this.truncateZxid = truncateZxid;
this.conflictEpoch = conflictEpoch;
this.truncateEpoch = truncateEpoch;
this.servers = new HashSet<>(servers);
}
public boolean isFromHigherEpoch() {
return conflictEpoch > truncateEpoch;
}
public long getConflictZxid() {
return conflictZxid;
}
public int getConflictEpoch() {
return conflictEpoch;
}
public Set<String> getServers() {
return Collections.unmodifiableSet(servers);
}
}
// 其他內部類定義...
enum ServerState {
LOOKING, // 尋找Leader
FOLLOWING, // Follower角色
LEADING // Leader角色
}
}
ZAB 廣播模式實現
public class ZABBroadcast implements AutoCloseable {
private final AtomicLong zxid;
private final AtomicInteger epoch;
private final ConcurrentMap<String, ServerData> followers;
private final Logger logger = LoggerFactory.getLogger(ZABBroadcast.class);
private final CircuitBreaker circuitBreaker;
private final NetworkClient networkClient;
private final StateMachine stateMachine;
private final String serverId;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private final ScheduledExecutorService scheduler;
private final MetricsCollector metrics;
private final RateLimiter heartbeatLogLimiter = RateLimiter.create(0.1); // 每10秒最多一條日誌
public ZABBroadcast(String serverId, AtomicLong zxid, AtomicInteger epoch,
NetworkClient networkClient, StateMachine stateMachine) {
this.serverId = serverId;
this.zxid = zxid;
this.epoch = epoch;
this.networkClient = networkClient;
this.stateMachine = stateMachine;
this.followers = new ConcurrentHashMap<>();
this.circuitBreaker = new CircuitBreaker(5, 10000); // 5次失敗,10秒重置
this.scheduler = Executors.newScheduledThreadPool(2, r -> {
Thread t = new Thread(r, "zab-scheduler-" + serverId);
t.setDaemon(true);
return t;
});
this.metrics = new MetricsCollector("zab_broadcast");
// 啓動心跳任務
scheduler.scheduleWithFixedDelay(this::sendHeartbeats,
500, 500, TimeUnit.MILLISECONDS);
}
// 添加Follower
public void addFollower(ServerData follower) {
followers.put(follower.getId(), follower);
}
// Leader處理寫請求
public CompletableFuture<Boolean> processWrite(Request request) {
Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "zab-broadcast");
MDC.put("serverId", serverId);
MDC.put("requestId", request.getId());
try {
return GlobalExceptionHandler.withExceptionHandling(
circuitBreaker.execute(() -> {
try {
// 1. 為請求生成zxid (高32位是epoch,低32位是計數器)
long newZxid = createNewZxid();
MDC.put("zxid", Long.toHexString(newZxid));
logger.info("Processing write request: {} with zxid: {}",
request.getId(), Long.toHexString(newZxid));
// 2. 將請求發送給所有Follower
List<Future<ACK>> futures = sendToFollowers(request, newZxid);
// 3. 等待過半Follower的ACK
if (waitForMajority(futures)) {
// 4. 通知所有Follower提交事務
commit(newZxid);
logger.info("Request {} committed successfully", request.getId());
// 5. 記錄指標
metrics.recordSuccessfulWrite(stopwatch.elapsed(TimeUnit.MILLISECONDS));
return CompletableFuture.completedFuture(true);
} else {
logger.warn("Failed to get majority ACKs for request {}", request.getId());
metrics.recordFailedWrite();
return CompletableFuture.completedFuture(false);
}
} catch (IOException e) {
logger.error("Failed to process write request: {}", request.getId(), e);
metrics.recordFailedWrite();
return CompletableFuture.failedFuture(
new ProcessingException("Failed to process write request", e));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while processing write request: {}", request.getId(), e);
metrics.recordFailedWrite();
return CompletableFuture.failedFuture(
new ProcessingException("Interrupted during write processing", e));
}
})
);
} catch (CircuitBreakerOpenException e) {
logger.error("Circuit breaker is open, rejecting request: {}", request.getId());
metrics.recordRejectedWrite();
return CompletableFuture.failedFuture(
new ProcessingException("Circuit breaker open, system overloaded", e));
} finally {
MDC.remove("component");
MDC.remove("serverId");
MDC.remove("requestId");
MDC.remove("zxid");
}
}
// 處理批量寫請求,提高吞吐量
public CompletableFuture<Map<String, Boolean>> processBatchWrite(List<Request> requests) {
if (requests.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyMap());
}
Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "zab-broadcast");
MDC.put("serverId", serverId);
MDC.put("batchSize", String.valueOf(requests.size()));
try {
return GlobalExceptionHandler.withExceptionHandling(
circuitBreaker.execute(() -> {
Map<String, Boolean> results = new HashMap<>();
try {
// 創建批處理包
BatchRequest batch = new BatchRequest();
for (Request req : requests) {
batch.addRequest(req);
results.put(req.getId(), false); // 默認失敗
}
// 為批次生成一個zxid
long batchZxid = createNewZxid();
MDC.put("zxid", Long.toHexString(batchZxid));
logger.info("Processing batch of {} requests with zxid: {}",
requests.size(), Long.toHexString(batchZxid));
// 發送批處理請求給所有Follower
List<Future<ACK>> futures = sendBatchToFollowers(batch, batchZxid);
// 等待多數派確認
if (waitForMajority(futures)) {
// 提交批次
commitBatch(batchZxid);
logger.info("Batch with {} requests committed successfully", requests.size());
// 設置所有請求結果為成功
for (Request req : requests) {
results.put(req.getId(), true);
}
metrics.recordSuccessfulBatchWrite(
requests.size(), stopwatch.elapsed(TimeUnit.MILLISECONDS));
} else {
logger.warn("Failed to get majority ACKs for batch");
metrics.recordFailedBatchWrite(requests.size());
}
} catch (Exception e) {
logger.error("Error processing batch write of {} requests", requests.size(), e);
metrics.recordFailedBatchWrite(requests.size());
}
return CompletableFuture.completedFuture(results);
})
);
} catch (CircuitBreakerOpenException e) {
logger.error("Circuit breaker is open, rejecting batch of {} requests", requests.size());
metrics.recordRejectedBatchWrite(requests.size());
Map<String, Boolean> results = new HashMap<>();
for (Request req : requests) {
results.put(req.getId(), false);
}
return CompletableFuture.failedFuture(
new ProcessingException("Circuit breaker open, system overloaded", e));
} finally {
MDC.remove("component");
MDC.remove("serverId");
MDC.remove("batchSize");
MDC.remove("zxid");
}
}
// 讀取操作的一致性保證
public CompletableFuture<Result> readWithConsistency(String key, ConsistencyLevel level) {
Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "zab-broadcast");
MDC.put("serverId", serverId);
MDC.put("key", key);
MDC.put("consistency", level.name());
try {
ReadStrategy strategy = readStrategies.getOrDefault(
level, readStrategies.get(ConsistencyLevel.EVENTUAL));
CompletableFuture<Result> result = strategy.execute(key, this::readLocal);
result.thenAccept(r ->
metrics.recordRead(level, stopwatch.elapsed(TimeUnit.MILLISECONDS)));
return result;
} catch (Exception e) {
logger.error("Error performing {} read for key: {}", level, key, e);
metrics.recordFailedRead(level);
return CompletableFuture.failedFuture(
new ProcessingException("Read operation failed", e));
} finally {
MDC.remove("component");
MDC.remove("serverId");
MDC.remove("key");
MDC.remove("consistency");
}
}
// 本地讀取數據
private Result readLocal(String key) {
rwLock.readLock().lock();
try {
// 實際實現會從本地數據庫讀取
return new Result(key, "value", true);
} finally {
rwLock.readLock().unlock();
}
}
// 生成新的zxid,處理溢出情況
private long createNewZxid() {
rwLock.writeLock().lock();
try {
long currentCounter = zxid.get() & 0xFFFFFFFFL;
// 檢測溢出並處理
if (currentCounter >= 0xFFFFFFFFL) {
// 計數器即將溢出,增加epoch
int newEpoch = epoch.incrementAndGet();
logger.warn("ZXID counter overflow, incrementing epoch to {}", newEpoch);
long newZxid = ((long)newEpoch << 32); // 重置計數器
zxid.set(newZxid);
return newZxid;
}
return zxid.incrementAndGet();
} finally {
rwLock.writeLock().unlock();
}
}
// 發送提案給所有Follower
private List<Future<ACK>> sendToFollowers(Request request, long newZxid)
throws IOException {
List<Future<ACK>> futures = new ArrayList<>();
ProposalPacket proposal = new ProposalPacket(newZxid, request);
ExecutorService executor = Executors.newFixedThreadPool(followers.size(),
r -> {
Thread t = new Thread(r, "proposal-sender-" + serverId);
t.setDaemon(true);
return t;
});
try {
for (var entry : followers.entrySet()) {
final String targetServerId = entry.getKey();
futures.add(executor.submit(() -> {
MDC.put("targetServerId", targetServerId);
try {
ACK ack = networkClient.sendProposal(targetServerId, proposal);
logger.debug("Received ACK from {} for zxid {}",
targetServerId, Long.toHexString(newZxid));
return ack;
} catch (IOException e) {
logger.error("Failed to send proposal to follower {}, zxid: {}",
targetServerId, Long.toHexString(newZxid), e);
return null;
} finally {
MDC.remove("targetServerId");
}
}));
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) {
List<Runnable> pendingTasks = executor.shutdownNow();
logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for executor to terminate");
}
}
return futures;
}
// 等待多數派響應
private boolean waitForMajority(List<Future<ACK>> futures)
throws InterruptedException {
int ackCount = 0;
int majority = (followers.size() / 2) + 1;
for (Future<ACK> future : futures) {
try {
ACK ack = future.get(5, TimeUnit.SECONDS);
if (ack != null && ack.isSuccess()) {
ackCount++;
if (ackCount >= majority) {
// 已獲得多數派確認,可以提前返回
return true;
}
}
} catch (ExecutionException e) {
logger.warn("Error getting ACK", e.getCause());
} catch (TimeoutException e) {
logger.warn("Timeout waiting for ACK");
}
}
return ackCount >= majority;
}
// 通知所有Follower提交事務
private void commit(long zxid) throws IOException {
CommitPacket commit = new CommitPacket(zxid);
for (var entry : followers.entrySet()) {
final String targetServerId = entry.getKey();
CompletableFuture.runAsync(() -> {
MDC.put("targetServerId", targetServerId);
try {
networkClient.sendCommit(targetServerId, commit);
logger.debug("Sent commit to {} for zxid {}",
targetServerId, Long.toHexString(zxid));
} catch (IOException e) {
logger.error("Failed to send commit to follower {}, zxid: {}",
targetServerId, Long.toHexString(zxid), e);
} finally {
MDC.remove("targetServerId");
}
});
}
}
// 發送批處理請求
private List<Future<ACK>> sendBatchToFollowers(BatchRequest batch, long batchZxid)
throws IOException {
ProposalPacket proposal = new ProposalPacket(batchZxid, batch);
return sendProposalToFollowers(proposal, batchZxid);
}
// 提交批處理請求
private void commitBatch(long batchZxid) throws IOException {
commit(batchZxid);
}
// 發送心跳給所有Follower
private void sendHeartbeats() {
long currentZxid = zxid.get();
for (var entry : followers.entrySet()) {
final String targetServerId = entry.getKey();
CompletableFuture.runAsync(() -> {
try {
networkClient.sendHeartbeat(targetServerId, currentZxid);
} catch (IOException e) {
// 心跳失敗,使用限流器避免日誌氾濫
if (heartbeatLogLimiter.tryAcquire()) {
logger.debug("Failed to send heartbeat to {}", targetServerId, e);
}
}
});
}
}
// 發送提案給所有Follower(通用方法)
private List<Future<ACK>> sendProposalToFollowers(ProposalPacket proposal, long zxid)
throws IOException {
List<Future<ACK>> futures = new ArrayList<>();
ExecutorService executor = Executors.newFixedThreadPool(followers.size(),
r -> {
Thread t = new Thread(r, "proposal-sender-" + serverId);
t.setDaemon(true);
return t;
});
try {
for (var entry : followers.entrySet()) {
final String targetServerId = entry.getKey();
futures.add(executor.submit(() -> {
MDC.put("targetServerId", targetServerId);
try {
ACK ack = networkClient.sendProposal(targetServerId, proposal);
logger.debug("Received ACK from {} for zxid {}",
targetServerId, Long.toHexString(zxid));
return ack;
} catch (IOException e) {
logger.error("Failed to send proposal to follower {}, zxid: {}",
targetServerId, Long.toHexString(zxid), e);
return null;
} finally {
MDC.remove("targetServerId");
}
}));
}
} finally {
executor.shutdown();
try {
if (!executor.awaitTermination(200, TimeUnit.MILLISECONDS)) {
List<Runnable> pendingTasks = executor.shutdownNow();
logger.warn("Force shutdown executor with {} pending tasks", pendingTasks.size());
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for executor to terminate");
}
}
return futures;
}
// 定義讀取策略接口和實現
private interface ReadStrategy {
CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal);
}
private final Map<ConsistencyLevel, ReadStrategy> readStrategies = new EnumMap<>(ConsistencyLevel.class);
{
// 初始化讀取策略
readStrategies.put(ConsistencyLevel.LINEARIZABLE, new LinearizableReadStrategy());
readStrategies.put(ConsistencyLevel.SEQUENTIAL, new SequentialReadStrategy());
readStrategies.put(ConsistencyLevel.READ_YOUR_WRITES, new ReadYourWritesStrategy());
readStrategies.put(ConsistencyLevel.BOUNDED_STALENESS, new BoundedStalenessStrategy());
readStrategies.put(ConsistencyLevel.EVENTUAL, new EventualReadStrategy());
}
// 線性一致性讀取策略
private class LinearizableReadStrategy implements ReadStrategy {
private final AtomicLong leaseExpirationTime = new AtomicLong(0);
private final long leaderLeaseMs = 5000; // 5秒租約
@Override
public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
// Leader需要確認自己仍然是Leader (租約機制)
if (System.currentTimeMillis() < leaseExpirationTime.get()) {
// 租約有效,可以安全讀取
return CompletableFuture.completedFuture(readFromLocal.get());
} else {
// 租約過期,需要重新獲取多數派確認
return renewLease().thenApply(renewed -> {
if (renewed) {
return readFromLocal.get();
} else {
throw new ConsistencyException("Cannot guarantee linearizable read");
}
});
}
}
private CompletableFuture<Boolean> renewLease() {
// 實際實現中,需要獲取多數派確認
leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs);
logger.info("Renewed leader lease until {}", leaseExpirationTime.get());
return CompletableFuture.completedFuture(true);
}
}
// 順序一致性讀取策略
private class SequentialReadStrategy implements ReadStrategy {
@Override
public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
// 確保應用了所有已提交的事務
return ensureAppliedUpToDate()
.thenApply(v -> readFromLocal.get());
}
private CompletableFuture<Void> ensureAppliedUpToDate() {
// 實際實現會確保所有已提交的事務都已應用
logger.debug("Ensuring all committed transactions are applied");
return CompletableFuture.completedFuture(null);
}
}
// 讀己所寫策略
private class ReadYourWritesStrategy implements ReadStrategy {
private final ConcurrentMap<String, Long> writeTimestamps = new ConcurrentHashMap<>();
@Override
public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
// 檢查是否有該key的寫入記錄
Long writeTime = writeTimestamps.get(key);
if (writeTime != null) {
// 確保經過足夠時間,寫入已經完成
long elapsed = System.currentTimeMillis() - writeTime;
if (elapsed < 100) { // 假設100ms足夠寫入完成
try {
Thread.sleep(100 - elapsed);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
return CompletableFuture.completedFuture(readFromLocal.get());
}
// 記錄寫入操作
public void recordWrite(String key) {
writeTimestamps.put(key, System.currentTimeMillis());
}
}
// 有界陳舊性策略
private class BoundedStalenessStrategy implements ReadStrategy {
private final ConcurrentMap<String, CacheEntry> cache = new ConcurrentHashMap<>();
private final long maxStalenessMs = 1000; // 最大陳舊時間1秒
@Override
public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
// 檢查緩存
CacheEntry entry = cache.get(key);
if (entry != null) {
long age = System.currentTimeMillis() - entry.getTimestamp();
if (age <= maxStalenessMs) {
// 緩存未過期,直接返回
return CompletableFuture.completedFuture(entry.getResult());
}
}
// 緩存過期或不存在,從本地讀取並更新緩存
Result result = readFromLocal.get();
cache.put(key, new CacheEntry(result, System.currentTimeMillis()));
return CompletableFuture.completedFuture(result);
}
// 定期清理過期緩存
public void cleanup() {
long now = System.currentTimeMillis();
cache.entrySet().removeIf(entry ->
now - entry.getValue().getTimestamp() > maxStalenessMs);
}
}
// 最終一致性策略
private class EventualReadStrategy implements ReadStrategy {
@Override
public CompletableFuture<Result> execute(String key, Supplier<Result> readFromLocal) {
// 直接從本地讀取,不保證看到最新寫入
return CompletableFuture.completedFuture(readFromLocal.get());
}
}
// 緩存條目
private static class CacheEntry {
private final Result result;
private final long timestamp;
public CacheEntry(Result result, long timestamp) {
this.result = result;
this.timestamp = timestamp;
}
public Result getResult() {
return result;
}
public long getTimestamp() {
return timestamp;
}
}
@Override
public void close() {
try {
List<Runnable> pendingTasks = scheduler.shutdownNow();
if (!pendingTasks.isEmpty()) {
logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size());
}
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
logger.warn("Scheduler did not terminate in time");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for scheduler termination");
}
}
// 斷路器實現(更安全的版本)
static class CircuitBreaker {
private final AtomicReference<State> state = new AtomicReference<>(State.CLOSED);
private final AtomicLong failureCount = new AtomicLong(0);
private final AtomicLong lastFailureTime = new AtomicLong(0);
private final int threshold;
private final long resetTimeoutMs;
private final StampedLock stateLock = new StampedLock();
private final Logger logger = LoggerFactory.getLogger(CircuitBreaker.class);
public enum State { CLOSED, OPEN, HALF_OPEN }
public CircuitBreaker(int threshold, long resetTimeoutMs) {
this.threshold = threshold;
this.resetTimeoutMs = resetTimeoutMs;
}
public <T> CompletableFuture<T> execute(Supplier<CompletableFuture<T>> action)
throws CircuitBreakerOpenException {
State currentState = getCurrentState();
if (currentState == State.OPEN) {
// 檢查是否應該嘗試半開狀態
if (System.currentTimeMillis() - lastFailureTime.get() > resetTimeoutMs) {
boolean transitioned = tryTransitionState(State.OPEN, State.HALF_OPEN);
if (!transitioned) {
throw new CircuitBreakerOpenException("Circuit breaker is open");
}
currentState = State.HALF_OPEN;
} else {
throw new CircuitBreakerOpenException("Circuit breaker is open");
}
}
final State executionState = currentState;
try {
CompletableFuture<T> future = action.get();
return future.handle((result, ex) -> {
if (ex != null) {
recordFailure();
throw new CompletionException(ex);
} else {
// 成功執行,重置失敗計數
if (executionState == State.HALF_OPEN) {
tryTransitionState(State.HALF_OPEN, State.CLOSED);
}
failureCount.set(0);
return result;
}
});
} catch (Exception e) {
recordFailure();
throw e;
}
}
private void recordFailure() {
long stamp = stateLock.writeLock();
try {
long failures = failureCount.incrementAndGet();
lastFailureTime.set(System.currentTimeMillis());
if (failures >= threshold && state.get() == State.CLOSED) {
logger.warn("Circuit breaker opening after {} failures", failures);
state.set(State.OPEN);
}
} finally {
stateLock.unlockWrite(stamp);
}
}
private boolean tryTransitionState(State fromState, State toState) {
long stamp = stateLock.writeLock();
try {
if (state.get() == fromState) {
state.set(toState);
logger.info("Circuit breaker state changed from {} to {}", fromState, toState);
return true;
}
return false;
} finally {
stateLock.unlockWrite(stamp);
}
}
// 使用樂觀讀獲取當前狀態
public State getCurrentState() {
long stamp = stateLock.tryOptimisticRead();
State result = state.get();
if (!stateLock.validate(stamp)) {
stamp = stateLock.readLock();
try {
result = state.get();
} finally {
stateLock.unlockRead(stamp);
}
}
return result;
}
}
// 全局異常處理器
static class GlobalExceptionHandler {
private static final Logger logger = LoggerFactory.getLogger(GlobalExceptionHandler.class);
public static <T> CompletableFuture<T> withExceptionHandling(CompletableFuture<T> future) {
return future.exceptionally(e -> {
Throwable cause = e instanceof CompletionException ? e.getCause() : e;
if (cause instanceof ConsistencyException) {
logger.error("Consistency error: {}", cause.getMessage());
} else if (cause instanceof IOException) {
logger.error("I/O error: {}", cause.getMessage());
} else if (cause instanceof InterruptedException) {
Thread.currentThread().interrupt();
logger.warn("Operation interrupted");
} else {
logger.error("Unexpected error: {}", cause.getClass().getName(), cause);
}
throw new CompletionException(cause);
});
}
}
// 指標收集類
private static class MetricsCollector {
private final Counter writeRequests;
private final Counter writeSuccess;
private final Counter writeFailed;
private final Counter writeRejected;
private final Counter batchWrites;
private final Counter batchWriteRequests;
private final Counter readRequests;
private final Map<ConsistencyLevel, Counter> readsByLevel = new EnumMap<>(ConsistencyLevel.class);
private final Histogram writeLatency;
private final Histogram batchWriteLatency;
private final Map<ConsistencyLevel, Histogram> readLatency = new EnumMap<>(ConsistencyLevel.class);
public MetricsCollector(String prefix) {
this.writeRequests = Counter.build()
.name(prefix + "_write_requests_total")
.help("Total number of write requests").register();
this.writeSuccess = Counter.build()
.name(prefix + "_write_success_total")
.help("Total number of successful writes").register();
this.writeFailed = Counter.build()
.name(prefix + "_write_failed_total")
.help("Total number of failed writes").register();
this.writeRejected = Counter.build()
.name(prefix + "_write_rejected_total")
.help("Total number of rejected writes").register();
this.batchWrites = Counter.build()
.name(prefix + "_batch_writes_total")
.help("Total number of batch writes").register();
this.batchWriteRequests = Counter.build()
.name(prefix + "_batch_write_requests_total")
.help("Total number of requests in batch writes").register();
this.readRequests = Counter.build()
.name(prefix + "_read_requests_total")
.help("Total number of read requests").register();
this.writeLatency = Histogram.build()
.name(prefix + "_write_latency_ms")
.help("Write latency in milliseconds").register();
this.batchWriteLatency = Histogram.build()
.name(prefix + "_batch_write_latency_ms")
.help("Batch write latency in milliseconds").register();
// 初始化各一致性級別的計數器和直方圖
for (ConsistencyLevel level : ConsistencyLevel.values()) {
readsByLevel.put(level, Counter.build()
.name(prefix + "_reads_" + level.name().toLowerCase() + "_total")
.help("Total " + level + " reads").register());
readLatency.put(level, Histogram.build()
.name(prefix + "_read_" + level.name().toLowerCase() + "_latency_ms")
.help(level + " read latency in milliseconds").register());
}
}
public void recordSuccessfulWrite(long latencyMs) {
writeRequests.inc();
writeSuccess.inc();
writeLatency.observe(latencyMs);
}
public void recordFailedWrite() {
writeRequests.inc();
writeFailed.inc();
}
public void recordRejectedWrite() {
writeRequests.inc();
writeRejected.inc();
}
public void recordSuccessfulBatchWrite(int batchSize, long latencyMs) {
batchWrites.inc();
batchWriteRequests.inc(batchSize);
writeRequests.inc(batchSize);
writeSuccess.inc(batchSize);
batchWriteLatency.observe(latencyMs);
}
public void recordFailedBatchWrite(int batchSize) {
batchWrites.inc();
batchWriteRequests.inc(batchSize);
writeRequests.inc(batchSize);
writeFailed.inc(batchSize);
}
public void recordRejectedBatchWrite(int batchSize) {
batchWrites.inc();
batchWriteRequests.inc(batchSize);
writeRequests.inc(batchSize);
writeRejected.inc(batchSize);
}
public void recordRead(ConsistencyLevel level, long latencyMs) {
readRequests.inc();
readsByLevel.get(level).inc();
readLatency.get(level).observe(latencyMs);
}
public void recordFailedRead(ConsistencyLevel level) {
readRequests.inc();
// 可以添加失敗計數器
}
}
// 異常類
public static class CircuitBreakerOpenException extends Exception {
public CircuitBreakerOpenException(String message) {
super(message);
}
}
public static class ConsistencyException extends RuntimeException {
public ConsistencyException(String message) {
super(message);
}
}
public static class ProcessingException extends RuntimeException {
public ProcessingException(String message, Throwable cause) {
super(message, cause);
}
}
// 其他內部類和常量定義...
enum ConsistencyLevel {
LINEARIZABLE, // 線性一致性(最強)
SEQUENTIAL, // 順序一致性
READ_YOUR_WRITES, // 讀己所寫
BOUNDED_STALENESS, // 有界陳舊性
EVENTUAL // 最終一致性(最弱)
}
}
Fast Leader Election 算法
public class FastLeaderElection {
private final AtomicLong logicalClock = new AtomicLong(0);
private final ConcurrentMap<String, Vote> receivedVotes = new ConcurrentHashMap<>();
private final String serverId;
private final NetworkManager networkManager;
private final int quorumSize;
private final AtomicInteger electionAttempts = new AtomicInteger(0);
private final Logger logger = LoggerFactory.getLogger(FastLeaderElection.class);
private final ZxidUtils zxidUtils;
public FastLeaderElection(String serverId, int quorumSize,
NetworkManager networkManager, ZxidUtils zxidUtils) {
this.serverId = serverId;
this.quorumSize = quorumSize;
this.networkManager = networkManager;
this.zxidUtils = zxidUtils;
}
public String lookForLeader() throws InterruptedException {
MDC.put("component", "fast-leader-election");
MDC.put("serverId", serverId);
try {
// 遞增邏輯時鐘
long newLogicalClock = logicalClock.incrementAndGet();
logger.info("Starting leader election with logical clock: {}", newLogicalClock);
// 初始化選票,投給自己
Vote vote = new Vote(serverId, zxidUtils.getLastZxid(), newLogicalClock);
receivedVotes.clear();
receivedVotes.put(serverId, vote);
// 向所有其他服務器發送選票
networkManager.broadcastVote(vote);
// 選舉超時時間
long startTime = System.currentTimeMillis();
long maxTimeout = 60000; // 60秒最大超時
// 選舉循環
Map<String, Integer> voteCounter = new HashMap<>();
String currentLeader = null;
while (System.currentTimeMillis() - startTime < maxTimeout) {
// 接收選票
Vote receivedVote = networkManager.receiveVote(200); // 200ms超時
if (receivedVote != null) {
MDC.put("candidateId", receivedVote.getServerId());
logger.debug("Received vote from {}: zxid={}, logicalClock={}",
receivedVote.getServerId(),
Long.toHexString(receivedVote.getZxid()),
receivedVote.getLogicalClock());
// 驗證邏輯時鐘
if (receivedVote.getLogicalClock() > newLogicalClock) {
// 發現更高的邏輯時鐘,需要更新自己的時鐘並重新開始選舉
logicalClock.set(receivedVote.getLogicalClock());
logger.info("Found higher logical clock: {}, restarting election",
receivedVote.getLogicalClock());
MDC.remove("candidateId");
electionAttempts.set(0); // 重置嘗試計數
return lookForLeader(); // 重新開始選舉
} else if (receivedVote.getLogicalClock() < newLogicalClock) {
// 忽略舊的邏輯時鐘選票
logger.debug("Ignoring vote with older logical clock: {}",
receivedVote.getLogicalClock());
MDC.remove("candidateId");
continue;
}
// 比較選票
int comparison = compareVotes(vote, receivedVote);
if (comparison < 0) {
// 收到更好的選票,更新自己的選票
vote = new Vote(receivedVote.getServerId(),
receivedVote.getZxid(),
newLogicalClock);
// 重新廣播更新後的選票
networkManager.broadcastVote(vote);
logger.info("Updated vote to server: {}", vote.getServerId());
}
// 記錄收到的選票
receivedVotes.put(receivedVote.getServerId(), receivedVote);
MDC.remove("candidateId");
// 統計票數
voteCounter.clear();
for (Vote v : receivedVotes.values()) {
String candidate = v.getServerId();
voteCounter.put(candidate, voteCounter.getOrDefault(candidate, 0) + 1);
// 檢查是否有候選人獲得多數派支持
if (voteCounter.get(candidate) >= quorumSize) {
currentLeader = candidate;
logger.info("Elected leader: {} with {} votes of {} required",
candidate, voteCounter.get(candidate), quorumSize);
break;
}
}
if (currentLeader != null) {
break; // 選出了Leader
}
}
}
if (currentLeader == null) {
// 處理選舉失敗,使用指數退避避免活鎖
handleElectionFailure();
logger.warn("Failed to elect a leader, retrying...");
return lookForLeader(); // 重試
}
electionAttempts.set(0); // 重置嘗試計數
return currentLeader;
} catch (Exception e) {
logger.error("Error during leader election", e);
// 增加選舉嘗試計數並退避
handleElectionFailure();
throw new LeaderElectionException("Leader election failed", e);
} finally {
MDC.remove("component");
MDC.remove("serverId");
}
}
// 處理選舉失敗,使用指數退避避免活鎖
private void handleElectionFailure() {
int attempts = electionAttempts.incrementAndGet();
// 指數退避
int backoffMs = Math.min(1000 * (1 << Math.min(attempts, 10)), 30000);
// 添加隨機抖動避免同步
backoffMs += ThreadLocalRandom.current().nextInt(backoffMs / 2);
logger.info("Election attempt {} failed, backing off for {}ms", attempts, backoffMs);
try {
Thread.sleep(backoffMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted during election backoff");
}
}
// 比較兩個選票,返回負數表示v2更好,0表示相等,正數表示v1更好
private int compareVotes(Vote v1, Vote v2) {
// 首先比較zxid,更大的zxid具有更高優先級
long zxidDiff = ZxidUtils.compareZxid(v1.getZxid(), v2.getZxid());
if (zxidDiff != 0) {
return (int) Math.signum(zxidDiff);
}
// zxid相等,比較serverId
return v1.getServerId().compareTo(v2.getServerId());
}
// 內部類和工具方法...
static class Vote {
private final String serverId;
private final long zxid;
private final long logicalClock;
public Vote(String serverId, long zxid, long logicalClock) {
this.serverId = serverId;
this.zxid = zxid;
this.logicalClock = logicalClock;
}
public String getServerId() {
return serverId;
}
public long getZxid() {
return zxid;
}
public long getLogicalClock() {
return logicalClock;
}
@Override
public String toString() {
return "Vote{serverId='" + serverId + "', zxid=" + Long.toHexString(zxid) +
", logicalClock=" + logicalClock + '}';
}
}
// 自定義異常類
public static class LeaderElectionException extends RuntimeException {
public LeaderElectionException(String message, Throwable cause) {
super(message, cause);
}
}
}
網絡客户端實現示例
public class NettyNetworkClient implements NetworkClient {
private final EventLoopGroup workerGroup;
private final Bootstrap bootstrap;
private final ConcurrentMap<String, Channel> channels = new ConcurrentHashMap<>();
private final int connectionTimeoutMs;
private final Logger logger = LoggerFactory.getLogger(NettyNetworkClient.class);
public NettyNetworkClient(int connectionTimeoutMs) {
this.connectionTimeoutMs = connectionTimeoutMs;
this.workerGroup = new NioEventLoopGroup();
this.bootstrap = new Bootstrap()
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectionTimeoutMs)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline()
.addLast(new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
.addLast(new LengthFieldPrepender(4))
.addLast(new PacketEncoder())
.addLast(new PacketDecoder())
.addLast(new ClientHandler());
}
});
}
@Override
public void connect(String serverId, String address, int port) throws IOException {
try {
ChannelFuture future = bootstrap.connect(address, port);
boolean connected = future.await(connectionTimeoutMs, TimeUnit.MILLISECONDS);
if (!connected || !future.isSuccess()) {
throw new IOException("Failed to connect to " + serverId + " at " +
address + ":" + port);
}
channels.put(serverId, future.channel());
logger.info("Connected to server: {} at {}:{}", serverId, address, port);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while connecting to " + serverId, e);
} catch (Exception e) {
throw new IOException("Failed to connect to " + serverId, e);
}
}
@Override
public void disconnect(String serverId) {
Channel channel = channels.remove(serverId);
if (channel != null) {
channel.close();
logger.info("Disconnected from server: {}", serverId);
}
}
@Override
public ACK sendProposal(String serverId, ProposalPacket proposal) throws IOException {
MDC.put("targetServerId", serverId);
try {
Channel channel = getChannel(serverId);
RequestFuture<ACK> future = new RequestFuture<>();
// 存儲請求-響應映射
Long requestId = generateRequestId();
RequestRegistry.register(requestId, future);
// 包裝請求
Request request = new Request(requestId, RequestType.PROPOSAL, proposal);
// 發送請求
channel.writeAndFlush(request).sync();
// 等待響應
ACK ack = future.get(5, TimeUnit.SECONDS);
if (ack == null) {
throw new IOException("Received null ACK from " + serverId);
}
return ack;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while sending proposal to " + serverId, e);
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for ACK from " + serverId, e);
} catch (ExecutionException e) {
throw new IOException("Error sending proposal to " + serverId, e.getCause());
} finally {
MDC.remove("targetServerId");
}
}
@Override
public void sendCommit(String serverId, CommitPacket commit) throws IOException {
MDC.put("targetServerId", serverId);
try {
Channel channel = getChannel(serverId);
// 包裝請求
Request request = new Request(generateRequestId(), RequestType.COMMIT, commit);
// 發送請求 - 不等待響應
channel.writeAndFlush(request);
} catch (Exception e) {
throw new IOException("Error sending commit to " + serverId, e);
} finally {
MDC.remove("targetServerId");
}
}
@Override
public LastZxidResponse sendEpochRequest(String serverId, EpochPacket epochPkt)
throws IOException {
MDC.put("targetServerId", serverId);
try {
Channel channel = getChannel(serverId);
RequestFuture<LastZxidResponse> future = new RequestFuture<>();
// 存儲請求-響應映射
Long requestId = generateRequestId();
RequestRegistry.register(requestId, future);
// 包裝請求
Request request = new Request(requestId, RequestType.EPOCH, epochPkt);
// 發送請求
channel.writeAndFlush(request).sync();
// 等待響應
LastZxidResponse response = future.get(5, TimeUnit.SECONDS);
if (response == null) {
throw new IOException("Received null LastZxidResponse from " + serverId);
}
return response;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted while sending epoch request to " + serverId, e);
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for LastZxidResponse from " + serverId, e);
} catch (ExecutionException e) {
throw new IOException("Error sending epoch request to " + serverId, e.getCause());
} finally {
MDC.remove("targetServerId");
}
}
// 實現其他接口方法...
@Override
public void sendSnapshot(String serverId, byte[] snapshot, long zxid) throws IOException {
MDC.put("targetServerId", serverId);
try {
Channel channel = getChannel(serverId);
// 由於快照可能很大,按塊發送
int chunkSize = 1024 * 1024; // 1MB塊
int totalChunks = (snapshot.length + chunkSize - 1) / chunkSize;
logger.info("Sending snapshot to {}, size: {} bytes, chunks: {}",
serverId, snapshot.length, totalChunks);
// 發送快照元數據
SnapshotMetadata metadata = new SnapshotMetadata(zxid, snapshot.length, totalChunks);
Request metadataRequest = new Request(generateRequestId(),
RequestType.SNAPSHOT_META, metadata);
channel.writeAndFlush(metadataRequest).sync();
// 分塊發送快照數據
for (int i = 0; i < totalChunks; i++) {
int offset = i * chunkSize;
int length = Math.min(chunkSize, snapshot.length - offset);
byte[] chunk = new byte[length];
System.arraycopy(snapshot, offset, chunk, 0, length);
SnapshotChunk snapshotChunk = new SnapshotChunk(i, totalChunks, chunk);
Request chunkRequest = new Request(generateRequestId(),
RequestType.SNAPSHOT_CHUNK, snapshotChunk);
channel.writeAndFlush(chunkRequest).sync();
if (i % 10 == 0 || i == totalChunks - 1) {
logger.debug("Sent snapshot chunk {}/{} to {}",
i + 1, totalChunks, serverId);
}
}
logger.info("Snapshot sent successfully to {}", serverId);
} catch (Exception e) {
throw new IOException("Error sending snapshot to " + serverId, e);
} finally {
MDC.remove("targetServerId");
}
}
// 獲取連接到指定服務器的通道
private Channel getChannel(String serverId) throws IOException {
Channel channel = channels.get(serverId);
if (channel == null || !channel.isActive()) {
throw new IOException("No active connection to server: " + serverId);
}
return channel;
}
// 生成唯一請求ID
private static final AtomicLong requestIdGenerator = new AtomicLong(0);
private static Long generateRequestId() {
return requestIdGenerator.incrementAndGet();
}
// 關閉客户端
public void shutdown() {
// 關閉所有連接
for (Channel channel : channels.values()) {
channel.close();
}
channels.clear();
// 關閉事件循環組
workerGroup.shutdownGracefully();
}
// 請求類型
enum RequestType {
PROPOSAL, COMMIT, EPOCH, TRUNCATE, TRANSACTION, NEWLEADER, HEARTBEAT,
SNAPSHOT_META, SNAPSHOT_CHUNK
}
// 請求對象
static class Request {
private final Long id;
private final RequestType type;
private final Object payload;
public Request(Long id, RequestType type, Object payload) {
this.id = id;
this.type = type;
this.payload = payload;
}
public Long getId() {
return id;
}
public RequestType getType() {
return type;
}
public Object getPayload() {
return payload;
}
}
// 快照元數據
static class SnapshotMetadata {
private final long zxid;
private final int totalSize;
private final int totalChunks;
public SnapshotMetadata(long zxid, int totalSize, int totalChunks) {
this.zxid = zxid;
this.totalSize = totalSize;
this.totalChunks = totalChunks;
}
public long getZxid() {
return zxid;
}
public int getTotalSize() {
return totalSize;
}
public int getTotalChunks() {
return totalChunks;
}
}
// 快照數據塊
static class SnapshotChunk {
private final int chunkIndex;
private final int totalChunks;
private final byte[] data;
public SnapshotChunk(int chunkIndex, int totalChunks, byte[] data) {
this.chunkIndex = chunkIndex;
this.totalChunks = totalChunks;
this.data = data.clone(); // 防禦性複製
}
public int getChunkIndex() {
return chunkIndex;
}
public int getTotalChunks() {
return totalChunks;
}
public byte[] getData() {
return data.clone(); // 防禦性複製
}
}
// 請求-響應映射註冊表
static class RequestRegistry {
private static final ConcurrentMap<Long, RequestFuture<?>> futures = new ConcurrentHashMap<>();
public static <T> void register(Long requestId, RequestFuture<T> future) {
futures.put(requestId, future);
}
@SuppressWarnings("unchecked")
public static <T> void complete(Long requestId, T response) {
RequestFuture<T> future = (RequestFuture<T>) futures.remove(requestId);
if (future != null) {
future.complete(response);
}
}
public static void completeExceptionally(Long requestId, Throwable exception) {
RequestFuture<?> future = futures.remove(requestId);
if (future != null) {
future.completeExceptionally(exception);
}
}
}
// 請求Future
static class RequestFuture<T> extends CompletableFuture<T> {
// 繼承CompletableFuture,無需額外實現
}
// 客户端處理器
private class ClientHandler extends SimpleChannelInboundHandler<Response> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Response response) {
Long requestId = response.getRequestId();
if (response.isSuccess()) {
RequestRegistry.complete(requestId, response.getPayload());
} else {
RequestRegistry.completeExceptionally(requestId,
new IOException("Request failed: " + response.getErrorMessage()));
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("Network client exception", cause);
ctx.close();
}
}
// 響應對象
static class Response {
private final Long requestId;
private final boolean success;
private final Object payload;
private final String errorMessage;
public Response(Long requestId, boolean success, Object payload, String errorMessage) {
this.requestId = requestId;
this.success = success;
this.payload = payload;
this.errorMessage = errorMessage;
}
public Long getRequestId() {
return requestId;
}
public boolean isSuccess() {
return success;
}
public Object getPayload() {
return payload;
}
public String getErrorMessage() {
return errorMessage;
}
}
// 編碼器
static class PacketEncoder extends MessageToByteEncoder<Request> {
@Override
protected void encode(ChannelHandlerContext ctx, Request msg, ByteBuf out) throws Exception {
// 使用協議緩衝區或自定義序列化
// 這裏簡化為示例
byte[] bytes = serializeRequest(msg);
out.writeBytes(bytes);
}
private byte[] serializeRequest(Request request) {
// 實際實現應使用正式的序列化機制
// 這裏簡化為示例
return new byte[0];
}
}
// 解碼器
static class PacketDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 使用協議緩衝區或自定義反序列化
// 這裏簡化為示例
if (in.readableBytes() >= 4) { // 至少包含長度字段
in.markReaderIndex();
int length = in.readInt();
if (in.readableBytes() < length) {
in.resetReaderIndex();
return;
}
byte[] data = new byte[length];
in.readBytes(data);
Response response = deserializeResponse(data);
out.add(response);
}
}
private Response deserializeResponse(byte[] data) {
// 實際實現應使用正式的反序列化機制
// 這裏簡化為示例
return null;
}
}
}
三、Paxos 算法實現
核心接口定義
// 角色接口定義
public interface Proposer {
CompletableFuture<Boolean> prepare(int ballot);
CompletableFuture<Boolean> propose(int ballot, Object value);
}
public interface Acceptor {
CompletableFuture<Promise> handlePrepare(int ballot);
CompletableFuture<Accepted> handleAccept(int ballot, Object value);
}
public interface Learner {
void learn(long instanceId, int ballot, Object value);
}
public interface NetworkClient {
CompletableFuture<Promise> sendPrepare(int nodeId, int ballot);
CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value);
void sendLearn(int nodeId, long instanceId, int ballot, Object value);
CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot);
CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId);
}
public interface StateMachine {
CompletableFuture<Void> apply(long instanceId, byte[] command);
long getLastApplied();
CompletableFuture<byte[]> takeSnapshot();
CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId);
}
Basic Paxos 實現
public class BasicPaxosNode implements Proposer, Acceptor, Learner, AutoCloseable {
private final int nodeId;
private final AtomicInteger ballot = new AtomicInteger(0);
private volatile Object proposalValue = null;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private volatile int acceptedBallot = 0;
private volatile Object acceptedValue = null;
private final int totalNodes;
private final NetworkClient networkClient;
private final Logger logger = LoggerFactory.getLogger(BasicPaxosNode.class);
private final RetryStrategy retryStrategy;
private final MetricsCollector metrics;
public BasicPaxosNode(int nodeId, int totalNodes, NetworkClient networkClient) {
this.nodeId = nodeId;
this.totalNodes = totalNodes;
this.networkClient = networkClient;
this.retryStrategy = new ExponentialBackoffRetry(100, 5000, 3);
this.metrics = new MetricsCollector("paxos_basic", nodeId);
}
// Proposer: 準備階段
@Override
public CompletableFuture<Boolean> prepare(int suggestedBallot) {
final int newBallot = suggestedBallot > 0 ? suggestedBallot : generateNewBallot();
final Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "paxos-proposer");
MDC.put("nodeId", String.valueOf(nodeId));
MDC.put("ballot", String.valueOf(newBallot));
logger.info("Starting prepare phase with ballot {}", newBallot);
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
try {
// 向所有Acceptor發送Prepare請求
List<CompletableFuture<Promise>> futures = sendPrepare(newBallot);
// 收集結果
List<Promise> promises = new ArrayList<>();
for (CompletableFuture<Promise> future : futures) {
try {
Promise promise = future.get(3, TimeUnit.SECONDS);
if (promise != null) {
promises.add(promise);
}
} catch (Exception e) {
logger.warn("Error getting prepare response", e);
}
}
// 如果獲得多數派響應
int quorum = getQuorum();
int okCount = (int) promises.stream().filter(Promise::isOk).count();
if (okCount >= quorum) {
// 更新ballot
ballot.updateAndGet(current -> Math.max(current, newBallot));
// 選擇已接受的最高編號提案的值
Promise highestPromise = selectHighestBallotPromise(promises);
rwLock.writeLock().lock();
try {
if (highestPromise != null && highestPromise.getAcceptedValue() != null) {
proposalValue = highestPromise.getAcceptedValue();
logger.info("Using previously accepted value: {}", proposalValue);
}
} finally {
rwLock.writeLock().unlock();
}
metrics.recordPrepareSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS));
result.complete(true);
} else {
logger.info("Failed to get quorum in prepare phase: {} of {} responses ok",
okCount, promises.size());
metrics.recordPrepareFailed();
result.complete(false);
}
} catch (Exception e) {
logger.error("Error in prepare phase", e);
metrics.recordPrepareFailed();
result.completeExceptionally(e);
} finally {
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("ballot");
}
});
return result;
}
// Proposer: 接受階段
@Override
public CompletableFuture<Boolean> propose(int ballot, Object value) {
final Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "paxos-proposer");
MDC.put("nodeId", String.valueOf(nodeId));
MDC.put("ballot", String.valueOf(ballot));
return prepare(ballot).thenCompose(prepared -> {
if (!prepared) {
logger.info("Prepare phase failed, cannot proceed to propose");
metrics.recordProposeFailed();
return CompletableFuture.completedFuture(false);
}
// 獲取當前要提議的值
final Object valueToPropose;
rwLock.readLock().lock();
try {
// 如果準備階段沒有發現已接受的值,使用提議者的值
valueToPropose = proposalValue != null ? proposalValue : value;
logger.info("Starting accept phase with ballot {} and value {}",
ballot, valueToPropose);
} finally {
rwLock.readLock().unlock();
}
return CompletableFuture.supplyAsync(() -> {
try {
// 向所有Acceptor發送Accept請求
List<CompletableFuture<Accepted>> futures = sendAccept(ballot, valueToPropose);
// 收集結果
List<Accepted> responses = new ArrayList<>();
for (CompletableFuture<Accepted> future : futures) {
try {
Accepted accepted = future.get(3, TimeUnit.SECONDS);
if (accepted != null) {
responses.add(accepted);
}
} catch (Exception e) {
logger.warn("Error getting accept response", e);
}
}
// 如果獲得多數派接受
int quorum = getQuorum();
int accepted = (int) responses.stream().filter(Accepted::isOk).count();
boolean success = accepted >= quorum;
if (success) {
logger.info("Value {} has been accepted by the majority ({} of {})",
valueToPropose, accepted, responses.size());
// 通知所有Learner
broadcastToLearners(1, ballot, valueToPropose);
metrics.recordProposeSuccess(stopwatch.elapsed(TimeUnit.MILLISECONDS));
} else {
logger.info("Failed to get quorum in accept phase: {} of {} responses ok",
accepted, responses.size());
metrics.recordProposeFailed();
}
return success;
} catch (Exception e) {
logger.error("Error in propose phase", e);
metrics.recordProposeFailed();
throw new CompletionException(e);
} finally {
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("ballot");
}
});
}).exceptionally(e -> {
logger.error("Failed to propose value", e);
metrics.recordProposeFailed();
return false;
});
}
// Acceptor: 處理Prepare請求
@Override
public CompletableFuture<Promise> handlePrepare(int proposalBallot) {
MDC.put("component", "paxos-acceptor");
MDC.put("nodeId", String.valueOf(nodeId));
MDC.put("ballot", String.valueOf(proposalBallot));
return CompletableFuture.supplyAsync(() -> {
Promise promise = new Promise();
rwLock.writeLock().lock();
try {
if (proposalBallot > acceptedBallot) {
// 承諾不再接受編號小於等於proposalBallot的提案
acceptedBallot = proposalBallot;
promise.setOk(true);
promise.setAcceptedBallot(this.acceptedBallot);
promise.setAcceptedValue(this.acceptedValue);
logger.info("Acceptor {} promised ballot {}", nodeId, proposalBallot);
metrics.recordPromiseMade();
} else {
promise.setOk(false);
logger.info("Acceptor {} rejected ballot {}, current ballot: {}",
nodeId, proposalBallot, acceptedBallot);
metrics.recordPromiseRejected();
}
return promise;
} finally {
rwLock.writeLock().unlock();
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("ballot");
}
});
}
// Acceptor: 處理Accept請求
@Override
public CompletableFuture<Accepted> handleAccept(int proposalBallot, Object proposalValue) {
MDC.put("component", "paxos-acceptor");
MDC.put("nodeId", String.valueOf(nodeId));
MDC.put("ballot", String.valueOf(proposalBallot));
return CompletableFuture.supplyAsync(() -> {
Accepted accepted = new Accepted();
rwLock.writeLock().lock();
try {
if (proposalBallot >= acceptedBallot) {
acceptedBallot = proposalBallot;
acceptedValue = proposalValue;
accepted.setOk(true);
logger.info("Acceptor {} accepted ballot {} with value {}",
nodeId, proposalBallot, proposalValue);
metrics.recordAcceptMade();
} else {
accepted.setOk(false);
logger.info("Acceptor {} rejected accept for ballot {}, current ballot: {}",
nodeId, proposalBallot, acceptedBallot);
metrics.recordAcceptRejected();
}
return accepted;
} finally {
rwLock.writeLock().unlock();
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("ballot");
}
});
}
// Learner: 學習已決議的值
@Override
public void learn(long instanceId, int ballot, Object value) {
MDC.put("component", "paxos-learner");
MDC.put("nodeId", String.valueOf(nodeId));
MDC.put("instanceId", String.valueOf(instanceId));
MDC.put("ballot", String.valueOf(ballot));
try {
logger.info("Learner {} learned value {} for instance {} with ballot {}",
nodeId, value, instanceId, ballot);
metrics.recordLearnReceived();
// 實際實現中,這裏會將學習到的值應用到狀態機
// applyToStateMachine(instanceId, value);
} finally {
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("instanceId");
MDC.remove("ballot");
}
}
// 發送Prepare請求給所有Acceptor
private List<CompletableFuture<Promise>> sendPrepare(int newBallot) {
List<CompletableFuture<Promise>> futures = new ArrayList<>();
for (int i = 0; i < totalNodes; i++) {
final int targetNodeId = i;
if (targetNodeId == this.nodeId) {
// 處理本地請求
futures.add(handlePrepare(newBallot));
} else {
// 發送遠程請求
futures.add(networkClient.sendPrepare(targetNodeId, newBallot)
.exceptionally(e -> {
logger.error("Failed to send prepare to node {}", targetNodeId, e);
return null;
}));
}
}
return futures;
}
// 發送Accept請求給所有Acceptor
private List<CompletableFuture<Accepted>> sendAccept(int ballot, Object value) {
List<CompletableFuture<Accepted>> futures = new ArrayList<>();
for (int i = 0; i < totalNodes; i++) {
final int targetNodeId = i;
if (targetNodeId == this.nodeId) {
// 處理本地請求
futures.add(handleAccept(ballot, value));
} else {
// 發送遠程請求
futures.add(networkClient.sendAccept(targetNodeId, ballot, value)
.exceptionally(e -> {
logger.error("Failed to send accept to node {}", targetNodeId, e);
return null;
}));
}
}
return futures;
}
// 通知所有Learner已決議的值
private void broadcastToLearners(long instanceId, int ballot, Object value) {
for (int i = 0; i < totalNodes; i++) {
final int targetNodeId = i;
if (targetNodeId == this.nodeId) {
// 本地學習
learn(instanceId, ballot, value);
} else {
// 異步通知其他Learner
CompletableFuture.runAsync(() -> {
try {
networkClient.sendLearn(targetNodeId, instanceId, ballot, value);
} catch (Exception e) {
logger.error("Failed to notify learner {}", targetNodeId, e);
}
});
}
}
}
// 選擇最高ballot的Promise
private Promise selectHighestBallotPromise(List<Promise> promises) {
return promises.stream()
.filter(p -> p.isOk() && p.getAcceptedValue() != null)
.max(Comparator.comparingInt(Promise::getAcceptedBallot))
.orElse(null);
}
// 生成比當前更大的提案編號 (加入節點ID保證唯一性)
private int generateNewBallot() {
// 確保新ballot大於之前的,並且保證不同節點的ballot唯一
return ballot.incrementAndGet() * totalNodes + nodeId;
}
// 獲取多數派數量
private int getQuorum() {
return totalNodes / 2 + 1;
}
@Override
public void close() {
// 釋放資源
metrics.close();
}
// Promise類
public static class Promise {
private boolean ok;
private int acceptedBallot;
private Object acceptedValue;
public boolean isOk() {
return ok;
}
public void setOk(boolean ok) {
this.ok = ok;
}
public int getAcceptedBallot() {
return acceptedBallot;
}
public void setAcceptedBallot(int acceptedBallot) {
this.acceptedBallot = acceptedBallot;
}
public Object getAcceptedValue() {
return acceptedValue;
}
public void setAcceptedValue(Object acceptedValue) {
this.acceptedValue = acceptedValue;
}
}
// Accepted類
public static class Accepted {
private boolean ok;
public boolean isOk() {
return ok;
}
public void setOk(boolean ok) {
this.ok = ok;
}
}
// 指標收集類
private static class MetricsCollector implements AutoCloseable {
// 指標定義...
public MetricsCollector(String prefix, int nodeId) {
// 初始化指標...
}
public void recordPrepareSuccess(long latencyMs) {
// 記錄準備階段成功
}
public void recordPrepareFailed() {
// 記錄準備階段失敗
}
public void recordProposeSuccess(long latencyMs) {
// 記錄提議階段成功
}
public void recordProposeFailed() {
// 記錄提議階段失敗
}
public void recordPromiseMade() {
// 記錄承諾次數
}
public void recordPromiseRejected() {
// 記錄拒絕承諾次數
}
public void recordAcceptMade() {
// 記錄接受次數
}
public void recordAcceptRejected() {
// 記錄拒絕接受次數
}
public void recordLearnReceived() {
// 記錄學習次數
}
@Override
public void close() {
// 清理資源
}
}
// 異常處理與重試策略
interface RetryStrategy {
<T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action);
}
// 指數退避重試策略
static class ExponentialBackoffRetry implements RetryStrategy {
private final long initialBackoffMs;
private final long maxBackoffMs;
private final int maxRetries;
private final Logger logger = LoggerFactory.getLogger(ExponentialBackoffRetry.class);
public ExponentialBackoffRetry(long initialBackoffMs, long maxBackoffMs, int maxRetries) {
this.initialBackoffMs = initialBackoffMs;
this.maxBackoffMs = maxBackoffMs;
this.maxRetries = maxRetries;
}
@Override
public <T> CompletableFuture<T> retry(Supplier<CompletableFuture<T>> action) {
return retryInternal(action, 0);
}
private <T> CompletableFuture<T> retryInternal(Supplier<CompletableFuture<T>> action,
int attempt) {
return action.get().exceptionally(e -> {
if (attempt >= maxRetries) {
throw new CompletionException(
new RetryExhaustedException("Max retries exceeded", e));
}
long backoff = Math.min(initialBackoffMs * (long)Math.pow(2, attempt), maxBackoffMs);
backoff += ThreadLocalRandom.current().nextInt((int)(backoff / 5));
logger.info("Retry attempt {} after {}ms due to: {}",
attempt + 1, backoff, e.getMessage());
return CompletableFuture.delayedExecutor(backoff, TimeUnit.MILLISECONDS)
.execute(() -> retryInternal(action, attempt + 1))
.join();
});
}
}
// 自定義異常類
public static class RetryExhaustedException extends RuntimeException {
public RetryExhaustedException(String message, Throwable cause) {
super(message, cause);
}
}
}
Multi-Paxos 實現
下面實現了 Multi-Paxos 的組件化架構,通過分離關注點提高代碼的可維護性:
public class MultiPaxosSystem {
private final int nodeId;
private final Configuration config;
private final MultiPaxosLog log;
private final MultiPaxosStateMachine stateMachine;
private final MultiPaxosNetworking networking;
private final RoleManager roleManager;
private final ScheduledExecutorService scheduler;
private final Logger logger = LoggerFactory.getLogger(MultiPaxosSystem.class);
public MultiPaxosSystem(int nodeId, Configuration config) {
this.nodeId = nodeId;
this.config = config;
this.log = new MultiPaxosLog();
this.stateMachine = new MultiPaxosStateMachine();
this.networking = new MultiPaxosNetworking(nodeId, config.getNodes());
this.roleManager = new RoleManager(this);
this.scheduler = Executors.newScheduledThreadPool(2, r -> {
Thread t = new Thread(r, "multi-paxos-scheduler-" + nodeId);
t.setDaemon(true);
return t;
});
// 啓動日誌應用線程
scheduler.scheduleWithFixedDelay(this::applyCommittedLogs, 100, 100, TimeUnit.MILLISECONDS);
// 啓動Leader租約檢查
scheduler.scheduleWithFixedDelay(this::checkLeaderLease, 1000, 1000, TimeUnit.MILLISECONDS);
}
// 客户端API
// 追加新日誌(寫操作)
public CompletableFuture<Boolean> appendLog(byte[] command) {
if (!roleManager.isLeader()) {
return CompletableFuture.failedFuture(
new NotLeaderException("Not the leader", roleManager.getLeaderHint()));
}
return roleManager.getLeaderRole().appendLog(command);
}
// 讀取操作
public CompletableFuture<byte[]> read(String key, ConsistencyLevel level) {
switch (level) {
case LINEARIZABLE:
if (!roleManager.isLeader()) {
return CompletableFuture.failedFuture(
new NotLeaderException("Not the leader", roleManager.getLeaderHint()));
}
return roleManager.getLeaderRole().linearizableRead(key);
case SEQUENTIAL:
return roleManager.getFollowerRole().sequentialRead(key);
case EVENTUAL:
default:
return roleManager.getFollowerRole().eventualRead(key);
}
}
// 嘗試成為Leader
public CompletableFuture<Boolean> electSelf() {
return roleManager.electSelf();
}
// 日誌應用
private void applyCommittedLogs() {
try {
long applied = stateMachine.getLastApplied();
long toApply = log.getCommitIndex();
if (applied >= toApply) {
return; // 已全部應用
}
List<CompletableFuture<Void>> applyFutures = new ArrayList<>();
// 應用從applied+1到toApply的所有日誌
for (long i = applied + 1; i <= toApply; i++) {
final long instanceId = i;
LogEntry entry = log.getEntry(instanceId);
if (entry != null && entry.isCommitted()) {
applyFutures.add(
stateMachine.apply(instanceId, entry.getCommand())
.thenRun(() -> {
logger.debug("Applied log entry at instance {} to state machine",
instanceId);
})
.exceptionally(e -> {
logger.error("Failed to apply log at instance {}", instanceId, e);
return null;
})
);
}
}
// 等待所有應用完成
CompletableFuture.allOf(applyFutures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
// 日誌壓縮
if (toApply - applied > 1000) { // 如果應用了大量日誌,考慮壓縮
log.compactLogs(stateMachine.getLastApplied());
}
})
.exceptionally(e -> {
logger.error("Error during log application", e);
return null;
});
} catch (Exception e) {
logger.error("Error applying committed logs", e);
}
}
// 檢查Leader租約
private void checkLeaderLease() {
if (roleManager.isLeader()) {
roleManager.getLeaderRole().checkLease();
}
}
// 關閉系統
public void shutdown() {
try {
List<Runnable> pendingTasks = scheduler.shutdownNow();
if (!pendingTasks.isEmpty()) {
logger.warn("Scheduler shutdown with {} pending tasks", pendingTasks.size());
}
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
logger.warn("Scheduler did not terminate in time");
}
networking.close();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for scheduler termination");
}
}
// 角色管理
public class RoleManager {
private final MultiPaxosSystem system;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final AtomicInteger currentBallot = new AtomicInteger(0);
private volatile int leaderNodeId = -1; // -1表示未知
private final LeaderRole leaderRole;
private final FollowerRole followerRole;
public RoleManager(MultiPaxosSystem system) {
this.system = system;
this.leaderRole = new LeaderRole(system);
this.followerRole = new FollowerRole(system);
}
public boolean isLeader() {
return isLeader.get();
}
public int getLeaderHint() {
return leaderNodeId;
}
public LeaderRole getLeaderRole() {
return leaderRole;
}
public FollowerRole getFollowerRole() {
return followerRole;
}
public int getCurrentBallot() {
return currentBallot.get();
}
public void setCurrentBallot(int ballot) {
currentBallot.set(ballot);
}
public CompletableFuture<Boolean> electSelf() {
return leaderRole.electSelf().thenApply(elected -> {
if (elected) {
isLeader.set(true);
leaderNodeId = nodeId;
}
return elected;
});
}
public void stepDown() {
if (isLeader.compareAndSet(true, false)) {
logger.info("Node {} stepping down from leader", nodeId);
}
}
public void recognizeLeader(int leaderId, int ballot) {
leaderNodeId = leaderId;
currentBallot.set(ballot);
if (leaderId != nodeId) {
isLeader.set(false);
}
}
}
// Leader角色實現
public class LeaderRole {
private final MultiPaxosSystem system;
private final AtomicLong leaseExpirationTime = new AtomicLong(0);
private final long leaderLeaseMs = 5000; // 5秒租約
public LeaderRole(MultiPaxosSystem system) {
this.system = system;
}
// Leader選舉
public CompletableFuture<Boolean> electSelf() {
MDC.put("component", "multi-paxos-leader");
MDC.put("nodeId", String.valueOf(nodeId));
logger.info("Node {} attempting to become leader", nodeId);
try {
int newBallot = generateNewBallot();
MDC.put("ballot", String.valueOf(newBallot));
return CompletableFuture.supplyAsync(() -> {
try {
// 執行Prepare階段
Map<Long, PrepareResponse> responseMap = networking.sendPrepareForAllInstances(newBallot)
.get(10, TimeUnit.SECONDS);
// 檢查是否獲得多數派支持
if (hasQuorumPromises(responseMap)) {
// 根據收集到的信息,更新本地日誌
updateLogFromPromises(responseMap);
// 成為Leader
system.roleManager.setCurrentBallot(newBallot);
system.roleManager.recognizeLeader(nodeId, newBallot);
logger.info("Node {} became leader with ballot {}", nodeId, newBallot);
renewLease();
// 執行接受階段,確保之前的日誌得到多數派接受
confirmPendingLogs();
return true;
} else {
logger.info("Failed to become leader - did not get quorum promises");
return false;
}
} catch (Exception e) {
logger.error("Error in become leader process", e);
return false;
} finally {
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("ballot");
}
});
} catch (Exception e) {
logger.error("Error initiating leader election", e);
MDC.remove("component");
MDC.remove("nodeId");
return CompletableFuture.failedFuture(e);
}
}
// Leader: 追加新日誌
public CompletableFuture<Boolean> appendLog(byte[] command) {
Stopwatch stopwatch = Stopwatch.createStarted();
MDC.put("component", "multi-paxos-leader");
MDC.put("nodeId", String.valueOf(nodeId));
if (!system.roleManager.isLeader()) {
MDC.remove("component");
MDC.remove("nodeId");
return CompletableFuture.failedFuture(
new NotLeaderException("Node is not the leader", system.roleManager.getLeaderHint()));
}
try {
long nextInstance = system.log.getNextInstanceId();
MDC.put("instanceId", String.valueOf(nextInstance));
logger.info("Leader {} appending log at instance {}", nodeId, nextInstance);
// 創建日誌條目
int currentBallot = system.roleManager.getCurrentBallot();
LogEntry entry = new LogEntry(currentBallot, command.clone()); // 防禦性複製
// 存儲日誌條目
system.log.setEntry(nextInstance, entry);
// 對於已有Leader,可以跳過Prepare階段,直接進入Accept階段
return CompletableFuture.supplyAsync(() -> {
try {
List<AcceptResponse> responses = networking.sendAcceptRequests(
nextInstance, currentBallot, command)
.get(5, TimeUnit.SECONDS);
// 如果多數派接受
int quorum = getQuorum();
if (countAccepts(responses) >= quorum) {
// 提交日誌
entry.setCommitted(true);
system.log.updateCommitIndex(nextInstance);
// 通知所有節點提交
networking.sendCommitNotifications(nextInstance, currentBallot);
logger.info("Log entry at instance {} has been committed", nextInstance);
return true;
} else {
logger.warn("Failed to get quorum for instance {}", nextInstance);
// 可能失去了領導權,嘗試重新選舉
system.roleManager.stepDown();
return false;
}
} catch (Exception e) {
logger.error("Error in append log", e);
throw new CompletionException(e);
} finally {
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("instanceId");
}
});
} catch (Exception e) {
logger.error("Error initiating append log", e);
MDC.remove("component");
MDC.remove("nodeId");
return CompletableFuture.failedFuture(e);
}
}
// 線性一致性讀取(通過Leader確認)
public CompletableFuture<byte[]> linearizableRead(String key) {
if (!system.roleManager.isLeader()) {
return CompletableFuture.failedFuture(
new NotLeaderException("Not the leader", system.roleManager.getLeaderHint()));
}
// 檢查租約
if (System.currentTimeMillis() >= leaseExpirationTime.get()) {
// 租約過期,需要重新確認Leader身份
return renewLease().thenCompose(renewed -> {
if (!renewed) {
return CompletableFuture.failedFuture(
new ConsistencyException("Could not renew leadership lease"));
}
return system.stateMachine.read(key);
});
}
// 租約有效,直接讀取
return system.stateMachine.read(key);
}
// 更新Leader租約
private CompletableFuture<Boolean> renewLease() {
if (!system.roleManager.isLeader()) {
return CompletableFuture.completedFuture(false);
}
return CompletableFuture.supplyAsync(() -> {
try {
// 向多數派發送心跳以確認仍是Leader
int currentBallot = system.roleManager.getCurrentBallot();
int responses = networking.sendLeadershipHeartbeats(currentBallot)
.get(3, TimeUnit.SECONDS);
if (responses >= getQuorum()) {
leaseExpirationTime.set(System.currentTimeMillis() + leaderLeaseMs);
logger.debug("Renewed leader lease until {}", leaseExpirationTime.get());
return true;
} else {
logger.warn("Failed to renew leadership lease");
return false;
}
} catch (Exception e) {
logger.error("Error renewing leadership lease", e);
return false;
}
});
}
// 檢查租約狀態
public void checkLease() {
if (!system.roleManager.isLeader()) {
return;
}
// 如果租約即將過期,嘗試續期
long now = System.currentTimeMillis();
long expiration = leaseExpirationTime.get();
// 如果租約將在1秒內過期,提前續期
if (now + 1000 > expiration) {
renewLease().thenAccept(renewed -> {
if (!renewed) {
logger.warn("Lease renewal failed, stepping down as leader");
system.roleManager.stepDown();
}
});
}
}
// 確保之前的日誌條目被多數派接受
private void confirmPendingLogs() {
// 實現邏輯...
}
// 根據prepare響應更新日誌
private void updateLogFromPromises(Map<Long, PrepareResponse> responseMap) {
// 實現邏輯...
}
// 檢查是否獲得多數派promise
private boolean hasQuorumPromises(Map<Long, PrepareResponse> responseMap) {
// 實現邏輯...
return true; // 簡化
}
// 統計accept響應
private int countAccepts(List<AcceptResponse> responses) {
return (int) responses.stream()
.filter(r -> r != null && r.isAccepted())
.count();
}
}
// Follower角色實現
public class FollowerRole {
private final MultiPaxosSystem system;
private final Map<String, CacheEntry> readCache = new ConcurrentHashMap<>();
private final long maxCacheAgeMs = 5000; // 5秒緩存過期
public FollowerRole(MultiPaxosSystem system) {
this.system = system;
}
// 處理心跳消息
public void handleHeartbeat(int leaderBallot, int leaderNodeId, long leaderCommitIndex) {
// 更新本地commit index
system.log.updateCommitIndex(leaderCommitIndex);
// 如果自己認為自己是Leader但收到更高ballot的心跳,則退位
if (system.roleManager.isLeader() && leaderBallot > system.roleManager.getCurrentBallot()) {
logger.info("Stepping down as leader due to heartbeat with higher ballot: {}",
leaderBallot);
system.roleManager.stepDown();
}
// 記錄當前Leader
system.roleManager.recognizeLeader(leaderNodeId, leaderBallot);
}
// 順序一致性讀(確保看到所有之前的寫入)
public CompletableFuture<byte[]> sequentialRead(String key) {
// 確保應用了所有已提交的事務
return ensureAppliedUpToCommitIndex()
.thenCompose(v -> system.stateMachine.read(key));
}
// 最終一致性讀(直接從本地讀取)
public CompletableFuture<byte[]> eventualRead(String key) {
return system.stateMachine.read(key);
}
// 確保應用到當前commitIndex
private CompletableFuture<Void> ensureAppliedUpToCommitIndex() {
long current = system.log.getCommitIndex();
long applied = system.stateMachine.getLastApplied();
if (applied >= current) {
return CompletableFuture.completedFuture(null); // 已全部應用
}
// 等待應用完成
CompletableFuture<Void> result = new CompletableFuture<>();
scheduler.execute(() -> {
try {
// 觸發應用
system.applyCommittedLogs();
// 檢查是否應用完成
if (system.stateMachine.getLastApplied() >= current) {
result.complete(null);
} else {
// 可能有一些延遲,再次檢查
scheduler.schedule(() -> {
system.applyCommittedLogs();
result.complete(null);
}, 50, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
result.completeExceptionally(e);
}
});
return result;
}
// 清理過期緩存
public void cleanupReadCache() {
long now = System.currentTimeMillis();
// 移除過期條目
readCache.entrySet().removeIf(entry ->
now - entry.getValue().getTimestamp() > maxCacheAgeMs);
// 如果緩存過大,移除最舊的條目
if (readCache.size() > config.getMaxCacheSize()) {
List<String> oldestKeys = readCache.entrySet().stream()
.sorted(Comparator.comparingLong(e -> e.getValue().getTimestamp()))
.limit(readCache.size() - config.getMaxCacheSize())
.map(Map.Entry::getKey)
.collect(Collectors.toList());
for (String key : oldestKeys) {
readCache.remove(key);
}
logger.info("Cache cleanup: removed {} old entries", oldestKeys.size());
}
}
}
// 內部組件類
// 日誌管理
public static class MultiPaxosLog {
private final ReadWriteLock logLock = new ReentrantReadWriteLock();
private final ConcurrentNavigableMap<Long, LogEntry> log = new ConcurrentSkipListMap<>();
private final AtomicLong nextInstanceId = new AtomicLong(1);
private final AtomicLong commitIndex = new AtomicLong(0);
private final Logger logger = LoggerFactory.getLogger(MultiPaxosLog.class);
public LogEntry getEntry(long index) {
logLock.readLock().lock();
try {
return log.get(index);
} finally {
logLock.readLock().unlock();
}
}
public void setEntry(long index, LogEntry entry) {
logLock.writeLock().lock();
try {
log.put(index, entry);
nextInstanceId.updateAndGet(current -> Math.max(current, index + 1));
} finally {
logLock.writeLock().unlock();
}
}
public long getNextInstanceId() {
return nextInstanceId.getAndIncrement();
}
public long getCommitIndex() {
return commitIndex.get();
}
public void updateCommitIndex(long newCommitIndex) {
// 原子更新提交索引,確保只增不減
commitIndex.updateAndGet(current -> Math.max(current, newCommitIndex));
}
// 日誌壓縮
public void compactLogs(long appliedIndex) {
// 保留最近的日誌,刪除舊日誌
final int retentionWindow = 1000; // 保留最近1000條
long truncatePoint = appliedIndex - retentionWindow;
if (truncatePoint <= 0) {
return; // 不需要壓縮
}
logLock.writeLock().lock();
try {
List<Long> toRemove = log.keySet().stream()
.filter(idx -> idx < truncatePoint)
.collect(Collectors.toList());
for (Long idx : toRemove) {
log.remove(idx);
}
logger.info("Compacted {} log entries before index {}",
toRemove.size(), truncatePoint);
} finally {
logLock.writeLock().unlock();
}
}
}
// 狀態機實現
public static class MultiPaxosStateMachine {
private final AtomicLong lastApplied = new AtomicLong(0);
private final Map<String, byte[]> keyValueStore = new ConcurrentHashMap<>();
private final Logger logger = LoggerFactory.getLogger(MultiPaxosStateMachine.class);
public CompletableFuture<Void> apply(long instanceId, byte[] command) {
return CompletableFuture.runAsync(() -> {
try {
// 解析命令
Command cmd = deserializeCommand(command);
// 應用到狀態機
if (cmd.getType() == CommandType.PUT) {
keyValueStore.put(cmd.getKey(), cmd.getValue());
} else if (cmd.getType() == CommandType.DELETE) {
keyValueStore.remove(cmd.getKey());
}
// 更新已應用索引
lastApplied.updateAndGet(current -> Math.max(current, instanceId));
} catch (Exception e) {
logger.error("Error applying command at instance {}", instanceId, e);
throw new CompletionException(e);
}
});
}
public CompletableFuture<byte[]> read(String key) {
return CompletableFuture.supplyAsync(() -> {
byte[] value = keyValueStore.get(key);
return value != null ? value.clone() : null; // 防禦性複製
});
}
public long getLastApplied() {
return lastApplied.get();
}
public CompletableFuture<byte[]> takeSnapshot() {
return CompletableFuture.supplyAsync(() -> {
try {
// 創建狀態機快照
return serializeState();
} catch (Exception e) {
logger.error("Error taking snapshot", e);
throw new CompletionException(e);
}
});
}
public CompletableFuture<Void> restoreSnapshot(byte[] snapshot, long instanceId) {
return CompletableFuture.runAsync(() -> {
try {
// 從快照恢復狀態
deserializeState(snapshot);
// 更新已應用索引
lastApplied.set(instanceId);
} catch (Exception e) {
logger.error("Error restoring snapshot", e);
throw new CompletionException(e);
}
});
}
// 序列化和反序列化輔助方法
private Command deserializeCommand(byte[] data) {
// 實際實現應使用正式的序列化機制
return new Command(CommandType.PUT, "key", data); // 簡化示例
}
private byte[] serializeState() {
// 實際實現應使用正式的序列化機制
return new byte[0]; // 簡化示例
}
private void deserializeState(byte[] data) {
// 實際實現應使用正式的序列化機制
// 簡化示例
}
}
// 網絡層
public static class MultiPaxosNetworking implements AutoCloseable {
private final int nodeId;
private final Map<Integer, NodeInfo> nodes;
private final NetworkClient client;
private final Logger logger = LoggerFactory.getLogger(MultiPaxosNetworking.class);
public MultiPaxosNetworking(int nodeId, Map<Integer, NodeInfo> nodes) {
this.nodeId = nodeId;
this.nodes = new HashMap<>(nodes);
this.client = createNetworkClient();
}
private NetworkClient createNetworkClient() {
// 實際實現應創建合適的網絡客户端
return new NetworkClientImpl();
}
public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareForAllInstances(int ballot) {
// 實現邏輯...
return CompletableFuture.completedFuture(new HashMap<>());
}
public CompletableFuture<List<AcceptResponse>> sendAcceptRequests(
long instanceId, int ballot, byte[] command) {
// 實現邏輯...
return CompletableFuture.completedFuture(new ArrayList<>());
}
public CompletableFuture<Integer> sendLeadershipHeartbeats(int ballot) {
// 實現邏輯...
return CompletableFuture.completedFuture(0);
}
public void sendCommitNotifications(long instanceId, int ballot) {
// 實現邏輯...
}
@Override
public void close() {
// 關閉網絡客户端
}
}
// 生成新的ballot,確保全局唯一性
private int generateNewBallot() {
// 確保新ballot大於之前的,並且不同節點生成的ballot唯一
int currentBallot = roleManager.getCurrentBallot();
return (currentBallot / config.getTotalNodes() + 1) * config.getTotalNodes() + nodeId;
}
// 獲取多數派數量
private int getQuorum() {
return config.getTotalNodes() / 2 + 1;
}
// 日誌條目
public static class LogEntry {
private int ballot;
private final byte[] command;
private volatile boolean committed;
LogEntry(int ballot, byte[] command) {
this.ballot = ballot;
this.command = command.clone(); // 防禦性複製
this.committed = false;
}
public int getBallot() {
return ballot;
}
public void setBallot(int ballot) {
this.ballot = ballot;
}
public byte[] getCommand() {
return command.clone(); // 防禦性複製
}
public boolean isCommitted() {
return committed;
}
public void setCommitted(boolean committed) {
this.committed = committed;
}
}
// 配置類
public static class Configuration {
private final int totalNodes;
private final Map<Integer, NodeInfo> nodes;
private final int maxCacheSize;
public Configuration(int totalNodes, Map<Integer, NodeInfo> nodes, int maxCacheSize) {
this.totalNodes = totalNodes;
this.nodes = new HashMap<>(nodes);
this.maxCacheSize = maxCacheSize;
}
public int getTotalNodes() {
return totalNodes;
}
public Map<Integer, NodeInfo> getNodes() {
return Collections.unmodifiableMap(nodes);
}
public int getMaxCacheSize() {
return maxCacheSize;
}
}
// 節點信息
public static class NodeInfo {
private final int id;
private final String host;
private final int port;
public NodeInfo(int id, String host, int port) {
this.id = id;
this.host = host;
this.port = port;
}
public int getId() {
return id;
}
public String getHost() {
return host;
}
public int getPort() {
return port;
}
}
// 命令類型
enum CommandType {
PUT, DELETE
}
// 命令對象
static class Command {
private final CommandType type;
private final String key;
private final byte[] value;
public Command(CommandType type, String key, byte[] value) {
this.type = type;
this.key = key;
this.value = value != null ? value.clone() : null; // 防禦性複製
}
public CommandType getType() {
return type;
}
public String getKey() {
return key;
}
public byte[] getValue() {
return value != null ? value.clone() : null; // 防禦性複製
}
}
// 響應類
public static class PrepareResponse {
// 實現...
}
public static class AcceptResponse {
private final boolean accepted;
public AcceptResponse(boolean accepted) {
this.accepted = accepted;
}
public boolean isAccepted() {
return accepted;
}
}
// 一致性級別
public enum ConsistencyLevel {
LINEARIZABLE, // 線性一致性
SEQUENTIAL, // 順序一致性
EVENTUAL // 最終一致性
}
// 異常類
public static class NotLeaderException extends RuntimeException {
private final int leaderHint;
public NotLeaderException(String message, int leaderHint) {
super(message);
this.leaderHint = leaderHint;
}
public int getLeaderHint() {
return leaderHint;
}
}
public static class ConsistencyException extends RuntimeException {
public ConsistencyException(String message) {
super(message);
}
}
// 簡化的網絡客户端實現
private static class NetworkClientImpl implements NetworkClient {
// 實現網絡接口...
@Override
public CompletableFuture<Promise> sendPrepare(int nodeId, int ballot) {
return null;
}
@Override
public CompletableFuture<Accepted> sendAccept(int nodeId, int ballot, Object value) {
return null;
}
@Override
public void sendLearn(int nodeId, long instanceId, int ballot, Object value) {
}
@Override
public CompletableFuture<Map<Long, PrepareResponse>> sendPrepareAllInstances(int nodeId, int ballot) {
return null;
}
@Override
public CompletableFuture<Void> sendSnapshot(int nodeId, byte[] snapshot, long lastInstanceId) {
return null;
}
}
}
四、網絡分區處理與成員變更
網絡分區檢測
public class PartitionHandler implements AutoCloseable {
private final String nodeId;
private final AtomicLong lastHeartbeatTime = new AtomicLong(0);
private final AtomicBoolean suspectPartition = new AtomicBoolean(false);
private final ScheduledExecutorService scheduler;
private final long heartbeatTimeoutMs;
private final Consumer<PartitionEvent> partitionCallback;
private final Logger logger = LoggerFactory.getLogger(PartitionHandler.class);
public PartitionHandler(String nodeId, long heartbeatTimeoutMs,
Consumer<PartitionEvent> partitionCallback) {
this.nodeId = nodeId;
this.heartbeatTimeoutMs = heartbeatTimeoutMs;
this.partitionCallback = partitionCallback;
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "partition-detector-" + nodeId);
t.setDaemon(true);
return t;
});
// 啓動心跳檢測任務
scheduler.scheduleAtFixedRate(
this::checkHeartbeat,
heartbeatTimeoutMs / 2,
heartbeatTimeoutMs / 2,
TimeUnit.MILLISECONDS
);
}
// 記錄收到心跳
public void recordHeartbeat() {
lastHeartbeatTime.set(System.currentTimeMillis());
if (suspectPartition.compareAndSet(true, false)) {
logger.info("Node {} no longer suspects network partition", nodeId);
partitionCallback.accept(new PartitionEvent(PartitionStatus.RECOVERED, nodeId));
}
}
// 檢查心跳超時
private void checkHeartbeat() {
try {
long now = System.currentTimeMillis();
long last = lastHeartbeatTime.get();
if (last > 0 && now - last > heartbeatTimeoutMs) {
// 可能存在網絡分區
if (suspectPartition.compareAndSet(false, true)) {
logger.warn("Node {} suspects network partition, last heartbeat: {}ms ago",
nodeId, now - last);
// 執行分區檢測回調
partitionCallback.accept(new PartitionEvent(PartitionStatus.SUSPECTED, nodeId));
}
}
} catch (Exception e) {
logger.error("Error checking heartbeat", e);
}
}
@Override
public void close() {
scheduler.shutdownNow();
try {
if (!scheduler.awaitTermination(1, TimeUnit.SECONDS)) {
logger.warn("Partition detector scheduler did not terminate in time");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while shutting down partition detector");
}
}
// 分區狀態枚舉
public enum PartitionStatus {
SUSPECTED, // 懷疑發生分區
CONFIRMED, // 確認發生分區
RECOVERED // 分區已恢復
}
// 分區事件類
public static class PartitionEvent {
private final PartitionStatus status;
private final String nodeId;
public PartitionEvent(PartitionStatus status, String nodeId) {
this.status = status;
this.nodeId = nodeId;
}
public PartitionStatus getStatus() {
return status;
}
public String getNodeId() {
return nodeId;
}
}
}
成員變更實現
public class MembershipManager implements AutoCloseable {
private final ConcurrentMap<String, ServerInfo> servers = new ConcurrentHashMap<>();
private volatile Configuration currentConfig;
private final AtomicLong configVersion = new AtomicLong(0);
private final String nodeId;
private final AtomicBoolean isLeader = new AtomicBoolean(false);
private final Logger logger = LoggerFactory.getLogger(MembershipManager.class);
private final ConfigurationStore configStore;
private final NetworkClient networkClient;
private final StampedLock configLock = new StampedLock();
public MembershipManager(String nodeId, boolean isLeader,
ConfigurationStore configStore,
NetworkClient networkClient) {
this.nodeId = nodeId;
this.isLeader.set(isLeader);
this.configStore = configStore;
this.networkClient = networkClient;
// 初始化配置
try {
this.currentConfig = configStore.loadConfiguration();
if (this.currentConfig == null) {
this.currentConfig = new Configuration(configVersion.get(), new HashMap<>());
}
servers.putAll(currentConfig.getServers());
} catch (IOException e) {
logger.error("Failed to load configuration", e);
this.currentConfig = new Configuration(configVersion.get(), new HashMap<>());
}
}
// 兩階段成員變更 - 安全添加節點
public CompletableFuture<Boolean> addServer(String serverId, String address, int port) {
MDC.put("component", "membership-manager");
MDC.put("nodeId", nodeId);
MDC.put("targetServerId", serverId);
if (!isLeader.get()) {
logger.warn("Only leader can change membership");
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("targetServerId");
return CompletableFuture.failedFuture(
new IllegalStateException("Only leader can change membership"));
}
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
long stamp = configLock.writeLock();
try {
logger.info("Starting server addition: {}", serverId);
// 第一階段:創建過渡配置(包含新舊所有節點)
Configuration oldConfig = currentConfig;
Configuration jointConfig = createJointConfig(oldConfig, serverId, address, port);
// 將過渡配置提交給集羣
commitConfiguration(jointConfig).thenAccept(committed -> {
if (!committed) {
logger.warn("Failed to commit joint configuration for server {}", serverId);
result.complete(false);
return;
}
logger.info("Joint configuration committed, proceeding to second phase");
// 第二階段:創建新配置(確認包含新節點)
Configuration newConfig = createNewConfig(jointConfig);
// 將新配置提交給集羣
commitConfiguration(newConfig).thenAccept(finalCommitted -> {
if (finalCommitted) {
logger.info("Server {} successfully added to cluster", serverId);
} else {
logger.warn("Failed to commit final configuration for server {}", serverId);
}
result.complete(finalCommitted);
}).exceptionally(e -> {
logger.error("Error committing final configuration for server {}",
serverId, e);
result.completeExceptionally(e);
return null;
});
}).exceptionally(e -> {
logger.error("Error committing joint configuration for server {}",
serverId, e);
result.completeExceptionally(e);
return null;
});
} catch (Exception e) {
logger.error("Error adding server {}", serverId, e);
result.completeExceptionally(e);
} finally {
configLock.unlockWrite(stamp);
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("targetServerId");
}
});
return result;
}
// 兩階段成員變更 - 安全移除節點
public CompletableFuture<Boolean> removeServer(String serverId) {
MDC.put("component", "membership-manager");
MDC.put("nodeId", nodeId);
MDC.put("targetServerId", serverId);
if (!isLeader.get()) {
logger.warn("Only leader can change membership");
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("targetServerId");
return CompletableFuture.failedFuture(
new IllegalStateException("Only leader can change membership"));
}
if (!servers.containsKey(serverId)) {
logger.warn("Server {} not found in configuration", serverId);
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("targetServerId");
return CompletableFuture.completedFuture(false);
}
CompletableFuture<Boolean> result = new CompletableFuture<>();
CompletableFuture.runAsync(() -> {
long stamp = configLock.writeLock();
try {
logger.info("Starting server removal: {}", serverId);
// 第一階段:創建過渡配置(標記要移除的節點)
Configuration oldConfig = currentConfig;
Configuration jointConfig = createJointConfig(oldConfig, serverId);
// 將過渡配置提交給集羣
commitConfiguration(jointConfig).thenAccept(committed -> {
if (!committed) {
logger.warn("Failed to commit joint configuration for removing server {}",
serverId);
result.complete(false);
return;
}
logger.info("Joint configuration committed, proceeding to second phase");
// 第二階段:創建新配置(移除目標節點)
Configuration newConfig = createNewConfigWithout(jointConfig, serverId);
// 將新配置提交給集羣
commitConfiguration(newConfig).thenAccept(finalCommitted -> {
if (finalCommitted) {
logger.info("Server {} successfully removed from cluster", serverId);
} else {
logger.warn("Failed to commit final configuration for removing server {}",
serverId);
}
result.complete(finalCommitted);
}).exceptionally(e -> {
logger.error("Error committing final configuration for removing server {}",
serverId, e);
result.completeExceptionally(e);
return null;
});
}).exceptionally(e -> {
logger.error("Error committing joint configuration for removing server {}",
serverId, e);
result.completeExceptionally(e);
return null;
});
} catch (Exception e) {
logger.error("Error removing server {}", serverId, e);
result.completeExceptionally(e);
} finally {
configLock.unlockWrite(stamp);
MDC.remove("component");
MDC.remove("nodeId");
MDC.remove("targetServerId");
}
});
return result;
}
// 創建過渡配置(添加節點)
private Configuration createJointConfig(Configuration oldConfig,
String newServerId, String address, int port) {
Map<String, ServerInfo> newServers = new HashMap<>(oldConfig.getServers());
newServers.put(newServerId, new ServerInfo(newServerId, address, port));
return new Configuration(configVersion.incrementAndGet(), newServers);
}
// 創建過渡配置(刪除節點)
private Configuration createJointConfig(Configuration oldConfig, String serverId) {
// 標記要刪除的節點(在過渡配置中仍存在,但標記為待移除)
Map<String, ServerInfo> jointServers = new HashMap<>(oldConfig.getServers());
ServerInfo serverInfo = jointServers.get(serverId);
if (serverInfo != null) {
ServerInfo markedServer = new ServerInfo(
serverId, serverInfo.getAddress(), serverInfo.getPort(), true);
jointServers.put(serverId, markedServer);
}
return new Configuration(configVersion.incrementAndGet(), jointServers);
}
// 創建新配置(確認添加節點)
private Configuration createNewConfig(Configuration jointConfig) {
// 最終配置,清除所有標記
Map<String, ServerInfo> newServers = new HashMap<>();
for (var entry : jointConfig.getServers().entrySet()) {
if (!entry.getValue().isMarkedForRemoval()) {
newServers.put(entry.getKey(), new ServerInfo(
entry.getValue().getId(),
entry.getValue().getAddress(),
entry.getValue().getPort(),
false
));
}
}
return new Configuration(configVersion.incrementAndGet(), newServers);
}
// 創建新配置(確認刪除節點)
private Configuration createNewConfigWithout(Configuration jointConfig, String serverId) {
Map<String, ServerInfo> newServers = new HashMap<>();
for (var entry : jointConfig.getServers().entrySet()) {
if (!entry.getKey().equals(serverId) && !entry.getValue().isMarkedForRemoval()) {
newServers.put(entry.getKey(), new ServerInfo(
entry.getValue().getId(),
entry.getValue().getAddress(),
entry.getValue().getPort(),
false
));
}
}
return new Configuration(configVersion.incrementAndGet(), newServers);
}
// 提交配置變更
private CompletableFuture<Boolean> commitConfiguration(Configuration config) {
return CompletableFuture.supplyAsync(() -> {
try {
// 實際實現會通過共識算法提交配置變更
logger.info("Committing configuration version {}", config.getVersion());
// 持久化配置
configStore.saveConfiguration(config);
// 更新本地配置
synchronized (this) {
currentConfig = config;
servers.clear();
servers.putAll(config.getServers());
}
// 廣播配置變更
broadcastConfigChange(config);
return true;
} catch (Exception e) {
logger.error("Error committing configuration", e);
return false;
}
});
}
// 廣播配置變更
private void broadcastConfigChange(Configuration config) {
// 向所有節點廣播配置變更
for (String serverId : servers.keySet()) {
if (!serverId.equals(nodeId)) {
CompletableFuture.runAsync(() -> {
try {
// 實際實現中調用網絡客户端發送配置
notifyConfigChange(serverId, config);
} catch (Exception e) {
logger.error("Failed to notify server {} of config change", serverId, e);
}
});
}
}
}
// 通知節點配置變更
private void notifyConfigChange(String serverId, Configuration config) {
// 實際實現會發送配置給指定節點
logger.debug("Notifying server {} of configuration change to version {}",
serverId, config.getVersion());
}
// 處理接收到的配置變更
public void handleConfigChange(Configuration newConfig) {
long stamp = configLock.writeLock();
try {
if (newConfig.getVersion() > currentConfig.getVersion()) {
try {
// 持久化新配置
configStore.saveConfiguration(newConfig);
// 更新本地配置
currentConfig = newConfig;
servers.clear();
servers.putAll(newConfig.getServers());
logger.info("Updated to new configuration version {}", newConfig.getVersion());
} catch (IOException e) {
logger.error("Failed to persist new configuration", e);
}
} else {
logger.debug("Ignoring old configuration version {} (current is {})",
newConfig.getVersion(), currentConfig.getVersion());
}
} finally {
configLock.unlockWrite(stamp);
}
}
// 獲取當前配置
public Configuration getCurrentConfig() {
long stamp = configLock.tryOptimisticRead();
Configuration config = currentConfig;
if (!configLock.validate(stamp)) {
stamp = configLock.readLock();
try {
config = currentConfig;
} finally {
configLock.unlockRead(stamp);
}
}
return config;
}
// 檢查節點是否在配置中(不包括標記為移除的節點)
public boolean isServerInConfig(String serverId) {
ServerInfo info = servers.get(serverId);
return info != null && !info.isMarkedForRemoval();
}
// 獲取有效服務器數量(不包括標記為移除的節點)
public int getActiveServerCount() {
return (int) servers.values().stream()
.filter(s -> !s.isMarkedForRemoval())
.count();
}
// 設置Leader狀態
public void setLeader(boolean isLeader) {
this.isLeader.set(isLeader);
}
@Override
public void close() {
// 釋放資源
}
// 配置類
public static class Configuration implements Serializable {
private static final long serialVersionUID = 1L;
private final long version;
private final Map<String, ServerInfo> servers;
public Configuration(long version, Map<String, ServerInfo> servers) {
this.version = version;
this.servers = new HashMap<>(servers);
}
public long getVersion() {
return version;
}
public Map<String, ServerInfo> getServers() {
return Collections.unmodifiableMap(servers);
}
}
// 服務器信息
public static class ServerInfo implements Serializable {
private static final long serialVersionUID = 1L;
private final String id;
private final String address;
private final int port;
private final boolean markedForRemoval;
public ServerInfo(String id, String address, int port) {
this(id, address, port, false);
}
public ServerInfo(String id, String address, int port, boolean markedForRemoval) {
this.id = id;
this.address = address;
this.port = port;
this.markedForRemoval = markedForRemoval;
}
public String getId() {
return id;
}
public String getAddress() {
return address;
}
public int getPort() {
return port;
}
public boolean isMarkedForRemoval() {
return markedForRemoval;
}
}
}
配置存儲實現
public class FileBasedConfigurationStore implements ConfigurationStore {
private final Path configPath;
private final Path snapshotDir;
private final Logger logger = LoggerFactory.getLogger(FileBasedConfigurationStore.class);
public FileBasedConfigurationStore(Path configPath, Path snapshotDir) {
this.configPath = configPath;
this.snapshotDir = snapshotDir;
try {
Files.createDirectories(configPath.getParent());
Files.createDirectories(snapshotDir);
} catch (IOException e) {
logger.error("Failed to create directories", e);
throw new UncheckedIOException("Failed to create directories", e);
}
}
@Override
public void saveConfiguration(MembershipManager.Configuration config) throws IOException {
// 使用原子寫入保證一致性
Path tempPath = configPath.resolveSibling(configPath.getFileName() + ".tmp");
try (ObjectOutputStream oos = new ObjectOutputStream(
new BufferedOutputStream(Files.newOutputStream(tempPath)))) {
oos.writeObject(config);
oos.flush();
Files.move(tempPath, configPath, StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
logger.info("Configuration version {} saved successfully", config.getVersion());
} catch (IOException e) {
logger.error("Failed to save configuration", e);
throw e;
}
}
@Override
public MembershipManager.Configuration loadConfiguration() throws IOException {
if (!Files.exists(configPath)) {
logger.info("Configuration file does not exist: {}", configPath);
return null;
}
try (ObjectInputStream ois = new ObjectInputStream(
new BufferedInputStream(Files.newInputStream(configPath)))) {
MembershipManager.Configuration config =
(MembershipManager.Configuration) ois.readObject();
logger.info("Loaded configuration version {}", config.getVersion());
return config;
} catch (ClassNotFoundException e) {
logger.error("Failed to deserialize configuration", e);
throw new IOException("Failed to deserialize configuration", e);
}
}
@Override
public void saveSnapshot(long index, byte[] data) throws IOException {
// 創建快照文件名,包含索引
String snapshotFileName = String.format("snapshot-%020d.bin", index);
Path snapshotPath = snapshotDir.resolve(snapshotFileName);
Path tempPath = snapshotDir.resolve(snapshotFileName + ".tmp");
try {
// 寫入臨時文件
Files.write(tempPath, data);
// 原子移動
Files.move(tempPath, snapshotPath, StandardCopyOption.ATOMIC_MOVE,
StandardCopyOption.REPLACE_EXISTING);
logger.info("Snapshot at index {} saved successfully, size: {} bytes",
index, data.length);
// 清理舊快照,保留最近的5個
cleanupOldSnapshots(5);
} catch (IOException e) {
logger.error("Failed to save snapshot at index {}", index, e);
throw e;
}
}
@Override
public SnapshotInfo loadLatestSnapshot() throws IOException {
try {
// 查找最新的快照文件
Optional<Path> latestSnapshot = Files.list(snapshotDir)
.filter(p -> p.getFileName().toString().startsWith("snapshot-") &&
p.getFileName().toString().endsWith(".bin"))
.max(Comparator.comparing(p -> p.getFileName().toString()));
if (latestSnapshot.isPresent()) {
Path snapshotPath = latestSnapshot.get();
String fileName = snapshotPath.getFileName().toString();
// 從文件名中提取索引
long index = Long.parseLong(fileName.substring(9, 29));
// 讀取快照數據
byte[] data = Files.readAllBytes(snapshotPath);
logger.info("Loaded snapshot at index {}, size: {} bytes", index, data.length);
return new SnapshotInfo(index, data);
} else {
logger.info("No snapshot found in directory: {}", snapshotDir);
return null;
}
} catch (IOException e) {
logger.error("Failed to load latest snapshot", e);
throw e;
}
}
// 清理舊快照,只保留最新的n個
private void cleanupOldSnapshots(int keepCount) throws IOException {
try {
List<Path> snapshots = Files.list(snapshotDir)
.filter(p -> p.getFileName().toString().startsWith("snapshot-") &&
p.getFileName().toString().endsWith(".bin"))
.sorted(Comparator.comparing(p -> p.getFileName().toString()))
.collect(Collectors.toList());
// 如果快照數量超過保留數量,刪除舊的
if (snapshots.size() > keepCount) {
int toDelete = snapshots.size() - keepCount;
for (int i = 0; i < toDelete; i++) {
Files.delete(snapshots.get(i));
logger.info("Deleted old snapshot: {}", snapshots.get(i).getFileName());
}
}
} catch (IOException e) {
logger.error("Failed to cleanup old snapshots", e);
throw e;
}
}
// 快照信息類
public static class SnapshotInfo {
private final long index;
private final byte[] data;
public SnapshotInfo(long index, byte[] data) {
this.index = index;
this.data = data.clone(); // 防禦性複製
}
public long getIndex() {
return index;
}
public byte[] getData() {
return data.clone(); // 防禦性複製
}
}
}
// 配置存儲接口
public interface ConfigurationStore {
void saveConfiguration(MembershipManager.Configuration config) throws IOException;
MembershipManager.Configuration loadConfiguration() throws IOException;
void saveSnapshot(long index, byte[] data) throws IOException;
SnapshotInfo loadLatestSnapshot() throws IOException;
// 快照信息內部類定義
class SnapshotInfo {
private final long index;
private final byte[] data;
public SnapshotInfo(long index, byte[] data) {
this.index = index;
this.data = data.clone();
}
public long getIndex() {
return index;
}
public byte[] getData() {
return data.clone();
}
}
}
跨數據中心複製支持
public class CrossDCReplication implements AutoCloseable {
private final String localDC;
private final List<String> allDCs;
private final Map<String, DCConnection> dcConnections;
private final ConsensusSystem localSystem;
private final Logger logger = LoggerFactory.getLogger(CrossDCReplication.class);
private final ScheduledExecutorService scheduler;
private final AtomicLong replicationIndex = new AtomicLong(0);
private final ConcurrentMap<String, AtomicLong> dcReplicationProgress = new ConcurrentHashMap<>();
public CrossDCReplication(String localDC, List<String> allDCs,
ConsensusSystem localSystem,
Map<String, DCConnectionConfig> dcConfigs) {
this.localDC = localDC;
this.allDCs = new ArrayList<>(allDCs);
this.localSystem = localSystem;
this.dcConnections = new HashMap<>();
// 初始化數據中心連接
for (String dc : allDCs) {
if (!dc.equals(localDC)) {
DCConnectionConfig config = dcConfigs.get(dc);
if (config != null) {
dcConnections.put(dc, new DCConnection(dc, config));
dcReplicationProgress.put(dc, new AtomicLong(0));
}
}
}
this.scheduler = Executors.newScheduledThreadPool(2, r -> {
Thread t = new Thread(r, "dc-replication-scheduler");
t.setDaemon(true);
return t;
});
// 啓動定期複製任務
scheduler.scheduleWithFixedDelay(
this::replicateChanges,
1000, 1000, TimeUnit.MILLISECONDS
);
// 啓動健康檢查任務
scheduler.scheduleWithFixedDelay(
this::checkDCHealth,
5000, 5000, TimeUnit.MILLISECONDS
);
}
// 複製請求到其他數據中心
public CompletableFuture<Boolean> replicateRequest(Request request) {
MDC.put("component", "cross-dc-replication");
MDC.put("requestId", request.getId());
try {
// 1. 首先在本地DC處理請求
return localSystem.processWrite(request)
.thenCompose(localSuccess -> {
if (!localSuccess) {
logger.warn("Request {} failed in local DC", request.getId());
return CompletableFuture.completedFuture(false);
}
// 2. 如果本地成功,更新複製索引
long index = replicationIndex.incrementAndGet();
// 3. 異步複製到其他數據中心
List<CompletableFuture<Boolean>> dcFutures = new ArrayList<>();
for (var entry : dcConnections.entrySet()) {
String dc = entry.getKey();
DCConnection connection = entry.getValue();
dcFutures.add(connection.replicateRequest(request, index)
.thenApply(success -> {
if (success) {
// 更新複製進度
dcReplicationProgress.get(dc).updateAndGet(
current -> Math.max(current, index));
logger.info("Request {} successfully replicated to DC {}",
request.getId(), dc);
} else {
logger.warn("Failed to replicate request {} to DC {}",
request.getId(), dc);
}
return success;
})
.exceptionally(e -> {
logger.error("Error replicating request {} to DC {}",
request.getId(), dc, e);
return false;
}));
}
// 4. 等待所有DC的響應,基於配置的複製策略
return handleDCReplications(dcFutures);
});
} finally {
MDC.remove("component");
MDC.remove("requestId");
}
}
// 根據複製策略處理跨DC複製結果
private CompletableFuture<Boolean> handleDCReplications(
List<CompletableFuture<Boolean>> dcFutures) {
ReplicationStrategy strategy = ReplicationStrategy.QUORUM; // 可配置
switch (strategy) {
case ALL:
// 所有DC都必須成功
return CompletableFuture.allOf(
dcFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> dcFutures.stream()
.allMatch(f -> {
try {
return f.get();
} catch (Exception e) {
return false;
}
}));
case QUORUM:
// 多數DC必須成功
return CompletableFuture.supplyAsync(() -> {
int successCount = 0;
int requiredSuccesses = (dcFutures.size() / 2) + 1;
for (CompletableFuture<Boolean> future : dcFutures) {
try {
if (future.get(5, TimeUnit.SECONDS)) {
successCount++;
if (successCount >= requiredSuccesses) {
return true;
}
}
} catch (Exception e) {
logger.warn("Error waiting for DC replication", e);
}
}
return successCount >= requiredSuccesses;
});
case ANY:
// 至少一個DC成功
return CompletableFuture.supplyAsync(() -> {
for (CompletableFuture<Boolean> future : dcFutures) {
try {
if (future.get(5, TimeUnit.SECONDS)) {
return true;
}
} catch (Exception e) {
logger.warn("Error waiting for DC replication", e);
}
}
return false;
});
case ASYNC:
// 異步複製,不等待結果
return CompletableFuture.completedFuture(true);
default:
logger.warn("Unknown replication strategy: {}, using QUORUM", strategy);
return CompletableFuture.supplyAsync(() -> {
int successCount = 0;
int requiredSuccesses = (dcFutures.size() / 2) + 1;
for (CompletableFuture<Boolean> future : dcFutures) {
try {
if (future.get(5, TimeUnit.SECONDS)) {
successCount++;
if (successCount >= requiredSuccesses) {
return true;
}
}
} catch (Exception e) {
logger.warn("Error waiting for DC replication", e);
}
}
return successCount >= requiredSuccesses;
});
}
}
// 定期同步數據中心之間的變更
private void replicateChanges() {
try {
// 獲取當前複製進度
Map<String, Long> progress = new HashMap<>();
for (var entry : dcReplicationProgress.entrySet()) {
progress.put(entry.getKey(), entry.getValue().get());
}
// 對每個DC,複製尚未同步的變更
for (var entry : dcConnections.entrySet()) {
String dc = entry.getKey();
DCConnection connection = entry.getValue();
long currentProgress = progress.get(dc);
if (currentProgress < replicationIndex.get()) {
// 查找需要複製的變更
List<ReplicationEntry> changes =
getChangesSince(currentProgress, replicationIndex.get());
if (!changes.isEmpty()) {
connection.replicateChanges(changes)
.thenAccept(lastIndex -> {
if (lastIndex > currentProgress) {
// 更新複製進度
dcReplicationProgress.get(dc).updateAndGet(
current -> Math.max(current, lastIndex));
logger.info("Replicated changes to DC {} up to index {}",
dc, lastIndex);
}
})
.exceptionally(e -> {
logger.error("Failed to replicate changes to DC {}", dc, e);
return null;
});
}
}
}
} catch (Exception e) {
logger.error("Error in replication task", e);
}
}
// 檢查數據中心健康狀態
private void checkDCHealth() {
for (var entry : dcConnections.entrySet()) {
String dc = entry.getKey();
DCConnection connection = entry.getValue();
connection.checkHealth()
.thenAccept(healthy -> {
if (healthy) {
if (connection.markHealthy()) {
logger.info("DC {} is now healthy", dc);
}
} else {
if (connection.markUnhealthy()) {
logger.warn("DC {} is now unhealthy", dc);
}
}
})
.exceptionally(e -> {
logger.error("Error checking health of DC {}", dc, e);
connection.markUnhealthy();
return null;
});
}
}
// 獲取指定範圍內的變更
private List<ReplicationEntry> getChangesSince(long fromIndex, long toIndex) {
// 實際實現應從日誌存儲中檢索變更
List<ReplicationEntry> changes = new ArrayList<>();
// 簡化示例
for (long i = fromIndex + 1; i <= toIndex; i++) {
// 模擬獲取變更
changes.add(new ReplicationEntry(i, null));
}
return changes;
}
@Override
public void close() {
// 關閉調度器
scheduler.shutdownNow();
try {
if (!scheduler.awaitTermination(5, TimeUnit.SECONDS)) {
logger.warn("Scheduler did not terminate in time");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.warn("Interrupted while waiting for scheduler termination");
}
// 關閉所有DC連接
for (DCConnection connection : dcConnections.values()) {
connection.close();
}
}
// 數據中心連接類
private class DCConnection implements AutoCloseable {
private final String dcId;
private final DCConnectionConfig config;
private final AtomicBoolean healthy = new AtomicBoolean(true);
private final NetworkClient networkClient;
public DCConnection(String dcId, DCConnectionConfig config) {
this.dcId = dcId;
this.config = config;
this.networkClient = createNetworkClient();
}
private NetworkClient createNetworkClient() {
// 創建用於跨DC通信的網絡客户端
// 簡化示例
return null;
}
public CompletableFuture<Boolean> replicateRequest(Request request, long index) {
if (!healthy.get()) {
return CompletableFuture.completedFuture(false);
}
// 實際實現中,將請求發送到目標DC
return CompletableFuture.completedFuture(true);
}
public CompletableFuture<Long> replicateChanges(List<ReplicationEntry> changes) {
if (!healthy.get() || changes.isEmpty()) {
return CompletableFuture.completedFuture(0L);
}
// 實際實現中,將變更批量發送到目標DC
long lastIndex = changes.get(changes.size() - 1).getIndex();
return CompletableFuture.completedFuture(lastIndex);
}
public CompletableFuture<Boolean> checkHealth() {
// 實際實現中,執行健康檢查
return CompletableFuture.completedFuture(true);
}
public boolean markHealthy() {
return healthy.compareAndSet(false, true);
}
public boolean markUnhealthy() {
return healthy.compareAndSet(true, false);
}
@Override
public void close() {
// 關閉網絡客户端
}
}
// 複製條目
private static class ReplicationEntry {
private final long index;
private final byte[] data;
public ReplicationEntry(long index, byte[] data) {
this.index = index;
this.data = data != null ? data.clone() : null;
}
public long getIndex() {
return index;
}
public byte[] getData() {
return data != null ? data.clone() : null;
}
}
// 數據中心連接配置
public static class DCConnectionConfig {
private final String primaryEndpoint;
private final List<String> backupEndpoints;
private final int connectTimeoutMs;
private final int readTimeoutMs;
public DCConnectionConfig(String primaryEndpoint, List<String> backupEndpoints,
int connectTimeoutMs, int readTimeoutMs) {
this.primaryEndpoint = primaryEndpoint;
this.backupEndpoints = new ArrayList<>(backupEndpoints);
this.connectTimeoutMs = connectTimeoutMs;
this.readTimeoutMs = readTimeoutMs;
}
// Getters...
}
// 複製策略
public enum ReplicationStrategy {
ALL, // 所有DC必須成功
QUORUM, // 多數DC必須成功
ANY, // 至少一個DC成功
ASYNC // 異步複製,不等待結果
}
}
五、ZAB 與 Paxos 的聯繫與區別
聯繫
兩者共同點:
- 多數派機制:都需要超過半數節點的確認以保證安全性,防止腦裂
- 階段性操作:都分為準備和提交/接受兩個主要階段來達成共識
- 安全性保證:在任何情況下都不會出現數據不一致的狀態
- 容錯能力:都能在部分節點失敗的情況下繼續工作
- 對網絡分區的處理:在網絡分區情況下保證安全性,寧可停止服務也不破壞一致性
區別
關鍵區別:
-
設計目標:
- ZAB:專為 ZooKeeper 設計的狀態機複製協議,強調系統整體的複製和順序性
- Paxos:通用的分佈式共識算法,關注對單一值的決議過程
-
主從關係:
- ZAB:明確的 Leader-Follower 架構,強調中心化處理
- Basic Paxos:原始設計中角色對稱,沒有固定 Leader
- Multi-Paxos:引入了 Leader 優化,但在理論上保持角色對稱性
-
消息順序:
- ZAB:保證 FIFO 嚴格順序處理,使用 ZXID(epoch + counter)保證全局順序
- Basic Paxos:不保證順序,只關注單值共識
- Multi-Paxos:可以通過實例 ID 保證順序,但需要額外機制
-
恢復機制:
- ZAB:有專門的崩潰恢復模式,包括選舉、發現、同步和激活等階段
- Paxos:通過常規算法流程處理崩潰恢復,沒有特殊的恢復模式
-
事務標識:
- ZAB:使用 ZXID(epoch + counter)作為全局唯一標識
- Paxos:使用提案編號(ballot number)和實例 ID 分別標識提案和位置
六、性能對比與工程實踐
性能對比
| 指標 | ZAB 協議 | Basic Paxos | Multi-Paxos |
|---|---|---|---|
| 寫入延遲 | 2RTT (正常模式) | 2RTT | 1RTT (有穩定 Leader) |
| 讀取延遲 | 0RTT (本地讀) - 1RTT (一致性讀) | 2RTT | 0RTT (本地讀) - 1RTT (一致性讀) |
| 寫入吞吐量 | 高 (批處理優化) | 低 | 中-高 (批處理優化) |
| 讀取吞吐量 | 非常高 (本地讀) | 低 | 高 (本地讀) |
| 消息複雜度 | O(n) | O(n²) | O(n) (穩定 Leader) |
| CPU 消耗 | 中等 | 高 | 中-高 |
| 內存佔用 | 中等 | 中等 | 中等 |
| 恢復時間 | 較短 (專門的恢復機制) | 較長 | 中等 |
橫向可擴展性
隨着集羣規模增加,性能變化情況:
| 集羣規模 | ZAB 協議 | Paxos 算法 |
|---|---|---|
| 3 節點 | 高吞吐量,低延遲 | 中等吞吐量,中等延遲 |
| 5 節點 | 良好吞吐量,輕微增加的延遲 | 吞吐量下降,延遲增加 |
| 7 節點 | 吞吐量下降,延遲增加 | 顯著吞吐量下降,高延遲 |
| 9+節點 | 不建議(性能下降明顯) | 不建議(性能下降明顯) |
JVM 調優建議
/**
* 推薦的JVM參數設置:
* -Xms4g -Xmx4g // 固定堆大小避免動態調整
* -XX:+UseG1GC // 使用G1垃圾收集器
* -XX:MaxGCPauseMillis=200 // 最大GC暫停時間
* -XX:InitiatingHeapOccupancyPercent=45 // GC啓動閾值
* -XX:+AlwaysPreTouch // 預分配內存頁
* -XX:+DisableExplicitGC // 禁用顯式GC調用
* -XX:+HeapDumpOnOutOfMemoryError // OOM時生成堆轉儲
* -XX:HeapDumpPath=/path/to/dumps // 堆轉儲路徑
* -XX:+UseCompressedOops // 使用壓縮指針
* -XX:+UseCompressedClassPointers // 使用壓縮類指針
* -Djava.net.preferIPv4Stack=true // 優先使用IPv4
*/
選型決策
工程實踐最佳建議
-
合理設置超時參數:
- 過短的超時會導致不必要的選舉
- 過長的超時會增加故障恢復時間
- 建議根據網絡環境動態調整
-
批處理請求:
- 合併多個寫請求為一個批次
- 減少網絡往返次數
- 提高整體吞吐量
-
讀寫分離:
- 寫請求經過 Leader
- 讀請求可以在本地處理(根據一致性需求)
- 使用讀取緩存減少磁盤 IO
-
監控關鍵指標:
- 提交延遲
- Leader 切換頻率
- 請求排隊深度
- 網絡延遲和帶寬使用
-
預防性維護:
- 定期壓縮日誌
- 創建快照
- 監控磁盤空間
- 進行故障演練測試恢復流程
七、單元測試示例
@RunWith(MockitoJUnitRunner.class)
public class ZABBroadcastTest {
private ZABBroadcast zabBroadcast;
private AtomicLong zxid;
private AtomicInteger epoch;
@Mock
private NetworkClient mockNetworkClient;
@Mock
private StateMachine mockStateMachine;
@Before
public void setUp() {
zxid = new AtomicLong(0);
epoch = new AtomicInteger(0);
zabBroadcast = new ZABBroadcast("server1", zxid, epoch, mockNetworkClient, mockStateMachine);
// 添加follower
ServerData follower1 = new ServerData("server2", "localhost", 8001);
ServerData follower2 = new ServerData("server3", "localhost", 8002);
zabBroadcast.addFollower(follower1);
zabBroadcast.addFollower(follower2);
}
@After
public void tearDown() {
zabBroadcast.close();
}
@Test
public void testProcessWriteSuccess() throws Exception {
// 準備模擬對象行為
ACK successAck = new ACK(true, zxid.get() + 1);
when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class)))
.thenReturn(successAck);
// 執行測試
Request request = new Request("req1", "test data".getBytes());
CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
// 驗證結果
assertTrue(result.get(1, TimeUnit.SECONDS));
// 驗證交互
verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class));
verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class));
}
@Test
public void testProcessWriteFailure() throws Exception {
// 準備模擬對象行為 - 一個成功,一個失敗
when(mockNetworkClient.sendProposal(eq("server2"), any(ProposalPacket.class)))
.thenReturn(new ACK(true, zxid.get() + 1));
when(mockNetworkClient.sendProposal(eq("server3"), any(ProposalPacket.class)))
.thenReturn(new ACK(false, zxid.get()));
// 執行測試
Request request = new Request("req1", "test data".getBytes());
CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
// 驗證結果 - 應該失敗,因為沒有多數派確認
assertFalse(result.get(1, TimeUnit.SECONDS));
// 驗證交互 - 不應該發送commit
verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class));
verify(mockNetworkClient, never()).sendCommit(anyString(), any(CommitPacket.class));
}
@Test
public void testBatchWritePerformance() throws Exception {
// 準備模擬對象行為
ACK successAck = new ACK(true, zxid.get() + 1);
when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class)))
.thenReturn(successAck);
// 準備批處理請求
List<Request> requests = new ArrayList<>();
for (int i = 0; i < 100; i++) {
requests.add(new Request("req" + i, ("data" + i).getBytes()));
}
// 執行測試
Stopwatch stopwatch = Stopwatch.createStarted();
CompletableFuture<Map<String, Boolean>> result = zabBroadcast.processBatchWrite(requests);
Map<String, Boolean> results = result.get(5, TimeUnit.SECONDS);
stopwatch.stop();
// 驗證結果
assertEquals(100, results.size());
assertTrue(results.values().stream().allMatch(v -> v));
// 打印性能數據
System.out.println("Batch write of 100 requests took " +
stopwatch.elapsed(TimeUnit.MILLISECONDS) + "ms");
// 驗證交互 - 只應該有一次網絡往返
verify(mockNetworkClient, times(2)).sendProposal(anyString(), any(ProposalPacket.class));
verify(mockNetworkClient, times(2)).sendCommit(anyString(), any(CommitPacket.class));
}
@Test
public void testCircuitBreakerTrip() throws Exception {
// 準備模擬對象行為 - 總是失敗
when(mockNetworkClient.sendProposal(anyString(), any(ProposalPacket.class)))
.thenReturn(new ACK(false, zxid.get()));
// 執行多次請求,觸發斷路器
Request request = new Request("req1", "test data".getBytes());
for (int i = 0; i < 5; i++) {
try {
CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
result.get(1, TimeUnit.SECONDS);
} catch (Exception e) {
// 忽略預期中的異常
}
}
// 執行第6次請求,應該直接被斷路器拒絕
try {
CompletableFuture<Boolean> result = zabBroadcast.processWrite(request);
result.get(1, TimeUnit.SECONDS);
fail("Should have thrown CircuitBreakerOpenException");
} catch (ExecutionException e) {
assertTrue(e.getCause() instanceof ProcessingException);
assertTrue(e.getCause().getCause() instanceof ZABBroadcast.CircuitBreakerOpenException);
}
}
@Test
public void testReadWithConsistencyLevels() throws Exception {
// 測試不同一致性級別的讀取
when(mockNetworkClient.sendHeartbeat(anyString(), anyLong()))
.thenReturn();
// 執行線性一致性讀取
CompletableFuture<Result> linearResult =
zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.LINEARIZABLE);
// 執行順序一致性讀取
CompletableFuture<Result> sequentialResult =
zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.SEQUENTIAL);
// 執行最終一致性讀取
CompletableFuture<Result> eventualResult =
zabBroadcast.readWithConsistency("testKey", ConsistencyLevel.EVENTUAL);
// 驗證所有請求都成功完成
assertNotNull(linearResult.get(1, TimeUnit.SECONDS));
assertNotNull(sequentialResult.get(1, TimeUnit.SECONDS));
assertNotNull(eventualResult.get(1, TimeUnit.SECONDS));
}
}
八、客户端 API 示例
public class DistributedSystemClient implements AutoCloseable {
private final ZabClient zabClient;
private final PaxosClient paxosClient;
private final Logger logger = LoggerFactory.getLogger(DistributedSystemClient.class);
public DistributedSystemClient(String zkConnectString, String paxosConnectString) {
this.zabClient = new ZabClient(zkConnectString);
this.paxosClient = new PaxosClient(paxosConnectString);
}
// ZAB客户端示例 - 配置服務
public class ZabClient implements AutoCloseable {
private final String connectString;
private final CuratorFramework client;
public ZabClient(String connectString) {
this.connectString = connectString;
this.client = CuratorFramework.builder()
.connectString(connectString)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
this.client.start();
}
// 存儲配置
public void storeConfig(String path, String data) throws Exception {
try {
// 檢查路徑是否存在
if (client.checkExists().forPath(path) == null) {
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
logger.info("Created config at path: {}", path);
} else {
client.setData()
.forPath(path, data.getBytes(StandardCharsets.UTF_8));
logger.info("Updated config at path: {}", path);
}
} catch (Exception e) {
logger.error("Failed to store config at path: {}", path, e);
throw e;
}
}
// 讀取配置
public String getConfig(String path) throws Exception {
try {
byte[] data = client.getData().forPath(path);
return new String(data, StandardCharsets.UTF_8);
} catch (Exception e) {
logger.error("Failed to read config from path: {}", path, e);
throw e;
}
}
// 監聽配置變化
public void watchConfig(String path, Consumer<String> changeCallback) throws Exception {
try {
// 設置監聽器
client.getData().usingWatcher(new CuratorWatcher() {
@Override
public void process(WatchedEvent event) throws Exception {
if (event.getType() == EventType.NodeDataChanged) {
String newData = getConfig(path);
changeCallback.accept(newData);
// 重新設置監聽
watchConfig(path, changeCallback);
}
}
}).forPath(path);
logger.info("Set watch on path: {}", path);
} catch (Exception e) {
logger.error("Failed to set watch on path: {}", path, e);
throw e;
}
}
// 分佈式鎖
public DistributedLock getLock(String lockPath) {
return new DistributedLock(client, lockPath);
}
@Override
public void close() {
client.close();
}
// 分佈式鎖實現
public class DistributedLock {
private final InterProcessMutex mutex;
private final String lockPath;
public DistributedLock(CuratorFramework client, String lockPath) {
this.lockPath = lockPath;
this.mutex = new InterProcessMutex(client, lockPath);
}
public void lock(long timeout, TimeUnit unit) throws Exception {
if (mutex.acquire(timeout, unit)) {
logger.info("Acquired lock: {}", lockPath);
} else {
logger.warn("Failed to acquire lock: {} within timeout", lockPath);
throw new TimeoutException("Failed to acquire lock: " + lockPath);
}
}
public void unlock() {
try {
mutex.release();
logger.info("Released lock: {}", lockPath);
} catch (Exception e) {
logger.error("Error releasing lock: {}", lockPath, e);
}
}
}
}
// Paxos客户端示例 - 分佈式KV存儲
public class PaxosClient implements AutoCloseable {
private final String connectString;
private final PaxosKVStore kvStore;
public PaxosClient(String connectString) {
this.connectString = connectString;
this.kvStore = new PaxosKVStore(connectString);
}
// 寫入鍵值對
public CompletableFuture<Boolean> put(String key, String value,
ConsistencyLevel consistencyLevel) {
return kvStore.put(key, value, consistencyLevel);
}
// 讀取鍵值
public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) {
return kvStore.get(key, consistencyLevel);
}
// 刪除鍵
public CompletableFuture<Boolean> delete(String key) {
return kvStore.delete(key);
}
@Override
public void close() {
kvStore.close();
}
// Paxos KV存儲實現
private class PaxosKVStore implements AutoCloseable {
private final PaxosClient client;
public PaxosKVStore(String connectString) {
// 實際實現會連接到Paxos集羣
this.client = null; // 簡化示例
}
public CompletableFuture<Boolean> put(String key, String value,
ConsistencyLevel consistencyLevel) {
// 實際實現會通過Paxos協議提交寫請求
logger.info("Putting key: {} with consistency: {}", key, consistencyLevel);
return CompletableFuture.completedFuture(true);
}
public CompletableFuture<String> get(String key, ConsistencyLevel consistencyLevel) {
// 實際實現會根據一致性級別選擇讀取策略
logger.info("Getting key: {} with consistency: {}", key, consistencyLevel);
return CompletableFuture.completedFuture("value");
}
public CompletableFuture<Boolean> delete(String key) {
// 刪除操作也是寫操作,通過Paxos協議提交
logger.info("Deleting key: {}", key);
return CompletableFuture.completedFuture(true);
}
@Override
public void close() {
// 釋放資源
}
}
}
// 使用示例
public void runExample() throws Exception {
// ZAB客户端使用示例
try (ZabClient zab = new ZabClient("localhost:2181")) {
// 存儲配置
zab.storeConfig("/app/config", "{\"timeout\": 30, \"maxRetries\": 3}");
// 讀取配置
String config = zab.getConfig("/app/config");
System.out.println("Config: " + config);
// 監聽配置變化
zab.watchConfig("/app/config", newConfig -> {
System.out.println("Config changed: " + newConfig);
});
// 使用分佈式鎖
ZabClient.DistributedLock lock = zab.getLock("/app/locks/resource1");
try {
lock.lock(10, TimeUnit.SECONDS);
// 臨界區操作
System.out.println("Performing critical operation...");
Thread.sleep(1000);
} finally {
lock.unlock();
}
}
// Paxos客户端使用示例
try (PaxosClient paxos = new PaxosClient("localhost:8000,localhost:8001,localhost:8002")) {
// 寫入數據
paxos.put("user:1001", "{\"name\":\"John\",\"email\":\"john@example.com\"}",
ConsistencyLevel.LINEARIZABLE)
.thenAccept(success -> {
System.out.println("Write success: " + success);
})
.join();
// 讀取數據
paxos.get("user:1001", ConsistencyLevel.SEQUENTIAL)
.thenAccept(value -> {
System.out.println("User data: " + value);
})
.join();
// 刪除數據
paxos.delete("user:1001")
.thenAccept(success -> {
System.out.println("Delete success: " + success);
})
.join();
}
}
@Override
public void close() throws Exception {
zabClient.close();
paxosClient.close();
}
}
九、總結
| 特性 | ZAB 協議 | Paxos 算法 |
|---|---|---|
| 設計目標 | 狀態機複製 | 分佈式共識 |
| 主從關係 | 明確的 Leader-Follower | Basic Paxos 無固定角色,Multi-Paxos 有 Leader 優化 |
| 消息順序 | 嚴格 FIFO 順序 | Basic Paxos 不保證順序,Multi-Paxos 可保證 |
| 恢復機制 | 專門的恢復模式 | 通過算法自身流程恢復 |
| 實現複雜度 | 中等 | 高 |
| 適用場景 | 需要強一致性和順序保證的系統 | 通用的分佈式系統,尤其是對單值決議 |
| 典型應用 | ZooKeeper | Chubby, etcd(Raft 變種) |
| 性能特點 | 寫操作需經過 Leader,讀性能高 | 基本實現較慢,優化後可獲得高性能 |
| 擴展性 | 受限於 ZooKeeper 架構 | 基礎理論更易擴展和變形 |
ZAB 和 Paxos 都是優秀的分佈式一致性算法,在現代分佈式系統設計中佔據核心地位。理解它們的工作原理、實現細節和適用場景,對構建可靠的分佈式系統至關重要。
無論選擇哪種算法,都需要根據具體應用場景、一致性需求和性能要求進行權衡。通過本文展示的工程實踐和優化技術,開發者可以構建出高性能、高可靠的分佈式系統,滿足各種複雜業務場景的需求。