Paxos Quorum面临的困境
在原生的Basic-Paxos或者Multi-Paxos中,Quorum的数量要求的是多数派,例如:一个5成员组成的Paxos集群,Prepare和Accept阶段需要获得3个Acceptor的支持。 Quorum=3的条件,在原生的Paxos中是硬性条件,在一些场景中,我们需要对提案的收敛更快,也就是希望提案能尽快的达成共识,那么我们希望尽可能的减少Quorum要求的数量。
NWR
NWR是分布式一致性策略模型,通过NWR,我们可以动态调节一致性的强度,它描述的是:
NWR值的不同组合会产生不同的一致性效果,例如:
Paxos结合NWR的思考
回顾Paxos的两个阶段(Prepare和Accept),Prepare阶段的作用有两个: 1. Prepare阶段要求获得多数派的支持,目的是为了获取集群中可能达成共识的提案。 2. 如果Prepare阶段获取到多数派中任意一个Acceptor批准过某个提案,那么Accept阶段只能以该提案在集群中复制。 3. 如果Prepare阶段获取到多数派中没有一个Acceptor批准过任何提案,那么Accept阶段可以用任意提案在集群中复制。 因此我们可以认为Paxos的Prepare阶段是一个读阶段,而Accept阶段是一个写阶段。 Paxos要保证已达成共识的提案不会再改变,那么就要求Prepare(读)阶段和Accept(写)阶段有能够交流信息的媒介,因为要读阶段告诉写阶段,应该写入哪个值嘛! 那这个媒介从哪里来呢?关键就在于多数派,多数派的含义:两个多数派(Prepare阶段的多数派和Accept阶段的多数派)一定存在一个相交的成员。这个相交的成员就是交流信息的媒介,我们只需要控制这个相交成员,让这个相交的成员告诉写阶段,应该写入什么值。
验证NWR
那这个是不是很像W + R > N的场景呢。我们验证一下Paxos的两个保证:
在一个5成员的集群中,我们设定Prepare阶段的Quorum为2,Accept阶段Quorum为4。
场景一(在一个instance上不会有多个提案达成共识),两个成员都获得Prepare的支持,都进入了Accept阶段。如下图所示:
1. A发起Prepare,proposalNo=1,获得了Quorum的支持 2. C发起Prepare,proposalNo=2,获得了Quorum的支持 3. A发起Accept,但是CD的propsalNo大于A,所以A未能达成共识 4. C发起Accept,但是CD的propsalNo大于A,达成共识
综上仍然只有一个提案达成共识。另外,假如第4步,C失败了。B发起Prepare,但是收到A和C提出的不同的提案应该怎么选择,这里还是跟原生Paxos一样,选择proposalNo大的那个。
场景二(已达成共识的提案不会改变),A已达成共识的提案,会不会因为B的协商而改变。如下图所示:
1. A发起Prepare,proposalNo=1,获得了Quorum的支持 2. A发起Accept,获得了Quorum的支持,已达成共识 3. C发起Prepare,proposalNo=1,获得Quorum的支持,但是收到C和D已批准提案的响应 4. C发起Accept,用C和D已批准提案的进行协商
综上已达成共识的提案不会再改变。
在Klein中的实践
Klein(https://github.com/shihuili1218/klein)是一个基于 Paxos 的分布式集合工具库,包括分布式ArrayList、分布式 HashMap、分布式缓存、分布式锁等。
定义NWR接口,为了让用户自己实现NWR策略,这里提供SPI接口
@SPI
public interface Nwr {
/**
* calculate read quorum.
*
* @param n total size
* @return read quorum
*/
int r(int n);
/**
* calculate write quorum.
*
* @param n total size
* @return write quorum
*/
int w(int n);
}
定义NWR策略为R = N,W = 1,这样可以最大限度加快协商收敛,让提案尽快达成共识,只需要等到一个成员的批准,就认为已经达成共识了。
@Join
public class FastWriteNwr implements Nwr {
@Override
public int r(final int n) {
return n;
}
@Override
public int w(final int n) {
return 1;
}
}
在初始化的时候使用FastWriteNwr,
public ProposeContext(final PaxosMemberConfiguration memberConfiguration, final Holder<Long> instanceIdHolder, final List<ProposalWithDone> events) {
this.memberConfiguration = memberConfiguration;
this.instanceIdHolder = instanceIdHolder;
this.dataWithCallback = ImmutableList.copyOf(events);
this.prepareQuorum = QuorumFactory.createReadQuorum(memberConfiguration);
this.prepareNexted = new AtomicBoolean(false);
this.acceptQuorum = QuorumFactory.createWriteQuorum(memberConfiguration);
this.acceptNexted = new AtomicBoolean(false);
}
public static Quorum createWriteQuorum(final MemberConfiguration memberConfiguration) {
Nwr nwr = ExtensionLoader.getExtensionLoader(Nwr.class).getJoin();
LOG.debug("create write quorum, nwr: {}", nwr.getClass());
if (CollectionUtils.isEmpty(memberConfiguration.getLastMembers())) {
return new SingleQuorum(memberConfiguration.getEffectMembers(),
nwr.w(memberConfiguration.getEffectMembers().size()));
} else {
return new JoinConsensusQuorum(memberConfiguration.getEffectMembers(), memberConfiguration.getLastMembers(),
nwr.w(memberConfiguration.getAllMembers().size()));
}
}
public static Quorum createReadQuorum(final MemberConfiguration memberConfiguration) {
Nwr nwr = ExtensionLoader.getExtensionLoader(Nwr.class).getJoin();
LOG.debug("create read quorum, nwr: {}", nwr.getClass());
if (CollectionUtils.isEmpty(memberConfiguration.getLastMembers())) {
return new SingleQuorum(memberConfiguration.getEffectMembers(),
nwr.r(memberConfiguration.getEffectMembers().size()));
} else {
return new JoinConsensusQuorum(memberConfiguration.getEffectMembers(), memberConfiguration.getLastMembers(),
nwr.r(memberConfiguration.getAllMembers().size()));
}
}