博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊Elasticsearch的RoundRobinSupplier
阅读量:6496 次
发布时间:2019-06-24

本文共 5738 字,大约阅读时间需要 19 分钟。

  hot3.png

本文主要研究一下Elasticsearch的RoundRobinSupplier

RoundRobinSupplier

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/RoundRobinSupplier.java

final class RoundRobinSupplier implements Supplier {    private final AtomicBoolean selectorsSet = new AtomicBoolean(false);    private volatile S[] selectors;    private AtomicInteger counter = new AtomicInteger(0);    RoundRobinSupplier() {        this.selectors = null;    }    RoundRobinSupplier(S[] selectors) {        this.selectors = selectors;        this.selectorsSet.set(true);    }    @Override    public S get() {        S[] selectors = this.selectors;        return selectors[counter.getAndIncrement() % selectors.length];    }    void setSelectors(S[] selectors) {        if (selectorsSet.compareAndSet(false, true)) {            this.selectors = selectors;        } else {            throw new AssertionError("Selectors already set. Should only be set once.");        }    }    int count() {        return selectors.length;    }}
  • RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值

NioSelectorGroup

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/NioSelectorGroup.java

public class NioSelectorGroup implements NioGroup {    private final List
dedicatedAcceptors; private final RoundRobinSupplier
acceptorSupplier; private final List
selectors; private final RoundRobinSupplier
selectorSupplier; private final AtomicBoolean isOpen = new AtomicBoolean(true); //...... public NioSelectorGroup(ThreadFactory acceptorThreadFactory, int dedicatedAcceptorCount, ThreadFactory selectorThreadFactory, int selectorCount, Function
, EventHandler> eventHandlerFunction) throws IOException { dedicatedAcceptors = new ArrayList<>(dedicatedAcceptorCount); selectors = new ArrayList<>(selectorCount); try { List
> suppliersToSet = new ArrayList<>(selectorCount); for (int i = 0; i < selectorCount; ++i) { RoundRobinSupplier
supplier = new RoundRobinSupplier<>(); suppliersToSet.add(supplier); NioSelector selector = new NioSelector(eventHandlerFunction.apply(supplier)); selectors.add(selector); } for (RoundRobinSupplier
supplierToSet : suppliersToSet) { supplierToSet.setSelectors(selectors.toArray(new NioSelector[0])); assert supplierToSet.count() == selectors.size() : "Supplier should have same count as selector list."; } for (int i = 0; i < dedicatedAcceptorCount; ++i) { RoundRobinSupplier
supplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); NioSelector acceptor = new NioSelector(eventHandlerFunction.apply(supplier)); dedicatedAcceptors.add(acceptor); } if (dedicatedAcceptorCount != 0) { acceptorSupplier = new RoundRobinSupplier<>(dedicatedAcceptors.toArray(new NioSelector[0])); } else { acceptorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); } selectorSupplier = new RoundRobinSupplier<>(selectors.toArray(new NioSelector[0])); assert selectorCount == selectors.size() : "We need to have created all the selectors at this point."; assert dedicatedAcceptorCount == dedicatedAcceptors.size() : "We need to have created all the acceptors at this point."; startSelectors(selectors, selectorThreadFactory); startSelectors(dedicatedAcceptors, acceptorThreadFactory); } catch (Exception e) { try { close(); } catch (Exception e1) { e.addSuppressed(e1); } throw e; } } public
S bindServerChannel(InetSocketAddress address, ChannelFactory
factory) throws IOException { ensureOpen(); return factory.openNioServerSocketChannel(address, acceptorSupplier); } @Override public
S openChannel(InetSocketAddress address, ChannelFactory
factory) throws IOException { ensureOpen(); return factory.openNioChannel(address, selectorSupplier); } //......}
  • NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)

ChannelFactory

elasticsearch-7.0.1/libs/nio/src/main/java/org/elasticsearch/nio/ChannelFactory.java

public abstract class ChannelFactory
{ //...... public ServerSocket openNioServerSocketChannel(InetSocketAddress address, Supplier
supplier) throws IOException { ServerSocketChannel rawChannel = rawChannelFactory.openNioServerSocketChannel(address); NioSelector selector = supplier.get(); ServerSocket serverChannel = internalCreateServerChannel(selector, rawChannel); scheduleServerChannel(serverChannel, selector); return serverChannel; } public Socket openNioChannel(InetSocketAddress remoteAddress, Supplier
supplier) throws IOException { SocketChannel rawChannel = rawChannelFactory.openNioChannel(remoteAddress); NioSelector selector = supplier.get(); Socket channel = internalCreateChannel(selector, rawChannel); scheduleChannel(channel, selector); return channel; } //......}
  • ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector

小结

  • RoundRobinSupplier实现了Supplier接口,其get方法使用counter.getAndIncrement() % selectors.length来选择selectors数组的下标,然后返回该下标的值
  • NioSelectorGroup的构造器创建了两个RoundRobinSupplier,分别是acceptorSupplier及selectorSupplier;bindServerChannel方法执行的是factory.openNioServerSocketChannel(address, acceptorSupplier);openChannel方法执行的是factory.openNioChannel(address, selectorSupplier)
  • ChannelFactory的openNioServerSocketChannel及openNioChannel方法都接收Supplier<NioSelector>参数,通过该supplier来选取NioSelector

doc

转载于:https://my.oschina.net/go4it/blog/3056757

你可能感兴趣的文章
The Little Prince-12/12
查看>>
git 调用 Beyond Compare
查看>>
ECMAScript 5 —— Function 类型 (一)
查看>>
SQL基础-->层次化查询(START BY ... CONNECT BY PRIOR)[转]
查看>>
android实现图片识别的几种方法
查看>>
bzoj1030[JSOI2007]文本生成器
查看>>
mvc学习地址
查看>>
masonry 基本用法
查看>>
使用openssl创建自签名证书及部署到IIS教程
查看>>
Word产品需求文档,已经过时了【转】
查看>>
dtoj#4299. 图(graph)
查看>>
关于网站的一些js和css常见问题的记录
查看>>
zabbix-3.4 触发器
查看>>
换用代理IP的Webbrowser方法
查看>>
【视频编解码·学习笔记】7. 熵编码算法:基础知识 & 哈夫曼编码
查看>>
spark集群安装部署
查看>>
笔试面试
查看>>
Tomcat v7.0 Server at localhost are already in use,tomcat提示端口被占用,tomcat端口已经被使用,tomcat端口占用...
查看>>
UGUI之控件以及按钮的监听事件系统
查看>>
Codeforces 814A - An abandoned sentiment from past(水题)
查看>>