序
本文主要研究一下Elasticsearch的RoundRobinSupplier
RoundRobinSupplier
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java
final class RoundRobinSupplierimplements Supplier{ private final AtomicBoolean selectorsSet = new AtomicBoolean(false); private volatile S[] selectors; private AtomicInteger counter = new AtomicInteger(0); RoundRobinSupplier() { this.selectors = null; } RoundRobinSupplier(S[] selectors) { this.selectors = selectors; this.selectorsSet.set(true); } @Override public S get() { S[] selectors = this.selectors; return selectors[counter.getAndIncrement() % selectors.length]; } void setSelectors(S[] selectors) { if (selectorsSet.compareAndSet(false, true)) { this.selectors = selectors; } else { throw new AssertionError("Selectors already set. Should only be set once."); } } int count() { return selectors.length; }}
- RoundRobinSupplier实现了Supplier接口,其get方法使用
counter.getAndIncrement() % selectors.length
来选择selectors数组的下标,然后返回该下标的值
NioSelectorGroup
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java
public class NioSelectorGroup implements NioGroup { private final ListdedicatedAcceptors; private final RoundRobinSupplier acceptorSupplier; private final List selectors; private final RoundRobinSupplier selectorSupplier; private final AtomicBoolean isOpen = new AtomicBoolean(true); //...... public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount, Function , EventHandler> eventHandlerFunction) throws IOException { dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); selectors = new ArrayList<>(selectorCount); try { List > suppliersToSet = new ArrayList<>(selectorCount); for (int i = 0; i < selectorCount; ++i) { RoundRobinSupplier supplier = new RoundRobinSupplier<>(); suppliersToSet.add(supplier); NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); selectors.add(selector); } for (RoundRobinSupplier supplierToSet : suppliersToSet) { supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; } for (int i = 0; i < dedicatedAcceptorCount; ++i) { RoundRobinSupplier supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); dedicatedAcceptors.add(acceptor); } if (dedicatedAcceptorCount != 0) { acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); } else { acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); } selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; startSelectors(selectors, selectorThreadFactory); startSelectors(dedicatedAcceptors, acceptorThreadFactory); } catch (Exception e) { try { close(); } catch (Exception e1) { e.addSuppressed(e1); } throw e; } } public S bindServerChannel(InetSocketAddress address, ChannelFactoryfactory) throws IOException { ensureOpen(); return factory.openNioServerSocketChannel(address, acceptorSupplier); } @Override publicS openChannel(InetSocketAddress address, ChannelFactory factory) throws IOException { ensureOpen(); return factory.openNioChannel(address, selectorSupplier); } //......}
- NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)
ChannelFactory
elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java
public abstract class ChannelFactory{ //...... public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier supplier) throws IOException { ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address); NioSelector selector = supplier.get(); ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel); scheduleServerChannel(serverChannel, selector); return serverChannel; } public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier supplier) throws IOException { SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSelector selector = supplier.get(); Socket channel = internalCreateChannel(selector, rawChannel); scheduleChannel(channel, selector); return channel; } //......}
- ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector
小结
- RoundRobinSupplier实现了Supplier接口,其get方法使用
counter.getAndIncrement() % selectors.length
来选择selectors数组的下标,然后返回该下标的值 - NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)
- ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector