Elasticsearch源码阅读--节点的启动和关闭 - Go语言中文社区

Elasticsearch源码阅读--节点的启动和关闭


Elasticsearch源码阅读–节点的启动和关闭

集群启动流程

在讲解节点的启动和关闭之前先来了解一下es集群是如何启动的.

从加载第一行代码开始到整个集群启动结束,集群的状态是从Red变成Green.这期间主要经历选举主节点,主分片,数据恢复等重要阶段.

集群整体的启动流程如下图所示.

es源码解析与实战

当一个索引的主分片分配成功后,到此分片的写操作就是允许的.当一个索引所有的分片都分配成功后,该索引变为Yellow.当全部索引的主分片都分配成功后,整个集群变为Yellow.当一个索引全部分片分配成功后,该索引变为Green.当全部集群的索引的索引分片分配成功后,整个集群变为Green.

节点的启动和关闭

了解了集群的启动流程之后我们再来看一下单个节点是如何启动的.

启动流程做了什么

总体来说,接电启动流程的任务是做下面几类工作:

  • 解析配置,包括配置文件和命令行参数;
  • 检查外部环境和内部环境;
  • 初始化内部资源,创建内部模块,初始化探测器;
  • 启动各个子模块和keepalive线程.

下面我们尝试通过阅读源码来了解启动过程的详细逻辑.

启动过程代码调试

代码书签和断点组

es代码量庞大,为了更好地阅读需要合理的使用idea自带的代码书签和断点组:

  • 代码书签:给所选位置的代码做一个书签方便阅读;
  • 断点组:把某一个了流程的一系列断点分组保存;

在使用idea进行断点调试的时候默认是All模式这也就意味着当多线程的时候一个线程被阻塞其他的线程依然会继续执行,如果选用Threa模式当一个线程阻塞的时候其他的线程也会同时等待.

源码获取

源码的获取方式可以直接访问GitHub,这里最好直接clone官方的项目.直接clone可以方便我们获取各个版本的tag以便对应官方发行版的打包文件.

第一次运行

运行ES的源码需Gradle具体的操作网上有很多文档我这里就不做过多的赘述了.下面我详细介绍一下如何在idea中运行ES源码.

添加初始化参数

想要从idea中运行es我们肯定要找到main方法,最简单的套路就是直接读一读es发行版中的其中shell.打开bin/elasticsearch文件我们可以看到如下的内容

从上面的启动命令中我们可以拼装出在idea中的运行参数以及main函数所在的位置:

  • 启动参数
-Des.path.home=/Users/zhengyi/linux/Elastic/code/elasticsearch-7.7.1-code-read/elasticsearch-7.7.1
-Des.path.conf=/Users/zhengyi/linux/Elastic/code/elasticsearch-7.7.1-code-read/elasticsearch-7.7.1/config
  • Main class:org/elasticsearch/bootstrap/Elasticsearch.java

在上图命令中还引用了一个参数**$ES_JAVA_OPTS**在这个脚本中也有注释 这个参数就是我们平时启动es时所配置的jvm.options.所以在参数中增加如下内容

-Xms1g
-Xmx1g
-Dlog4j2.disable.jmx=true

将参数配置好之后尝试通过主函数启动es

启动之后会发现如下报错

org.elasticsearch.bootstrap.StartupException: java.security.AccessControlException: access denied ("java.lang.RuntimePermission" "createClassLoader")
	at org.elasticsearch.bootstrap.Elasticsearch.init(Elasticsearch.java:174) ~[classes/:?]
	at org.elasticsearch.bootstrap.Elasticsearch.execute(Elasticsearch.java:161) ~[classes/:?]
	at org.elasticsearch.cli.EnvironmentAwareCommand.execute(EnvironmentAwareCommand.java:86) ~[classes/:?]
	at org.elasticsearch.cli.Command.mainWithoutErrorHandling(Command.java:127) ~[classes/:?]
	at org.elasticsearch.cli.Command.main(Command.java:90) ~[classes/:?]
	at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:126) ~[classes/:?]
	at org.elasticsearch.bootstrap.Elasticsearch.main(Elasticsearch.java:92) ~[classes/:?]

通过百度得知这是因为没有指定java.policy文件导致的,所以我们需要将java.policy添加到启动参数中(java.policy)的内容可以根据es启动时所提示的权限需求自行修改

java.policy内容

grant {
     permission java.lang.RuntimePermission "getClassLoader";
     permission java.lang.RuntimePermission "register";
     permission java.lang.RuntimePermission "createClassLoader";
     permission java.lang.RuntimePermission "getProtectionDomain";
};
-Djava.security.policy=/Users/zhengyi/linux/Elastic/code/elasticsearch-7.7.1-code-read/elasticsearch-7.7.1/config/java.policy

调用一下本地的9200端口查看返回结果.

{
  "name" : "bogon",
  "cluster_name" : "my-application",
  "cluster_uuid" : "M0INXj4wR3uXhbSmcjAmmA",
  "version" : {
    "number" : "7.7.1",
    "build_flavor" : "unknown",
    "build_type" : "unknown",
    "build_hash" : "unknown",
    "build_date" : "unknown",
    "build_snapshot" : true,
    "lucene_version" : "8.5.1",
    "minimum_wire_compatibility_version" : "6.8.0",
    "minimum_index_compatibility_version" : "6.0.0-beta1"
  },
  "tagline" : "You Know, for Search"
}

到此为止es项目我们已经成功的从idea启动了.

main函数

    public static void main(final String[] args) throws Exception {
        overrideDnsCachePolicyProperties();
        // 初始化了一个默认的安全管理器这样就不会有失效的策略
        System.setSecurityManager(new SecurityManager() {
            ......
        });
        // 在程序的开始注册日志监听
        LogConfigurator.registerErrorListener();
        final Elasticsearch elasticsearch = new Elasticsearch();
        // 执行主逻辑流程
        int status = main(args, elasticsearch, Terminal.DEFAULT);
        // 退出处理
        exit(status);
        }
    }

这个main方法先来看两个地方一个是Elasticsearch类的构造函数初始化一个是**main()**方法的执行

Elasticsearch

构造函数中初始化了所要加载的命令从这里可以看到es所接受的命令参数有哪些

    Elasticsearch() {
        super("Starts Elasticsearch", () -> {});
        versionOption = parser.acceptsAll(Arrays.asList("V", "version"),
            "Prints Elasticsearch version information and exits");
        daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"),
            "Starts Elasticsearch in the background")
            .availableUnless(versionOption);
        pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"),
            "Creates a pid file in the specified path on start")
            .availableUnless(versionOption)
            .withRequiredArg()
            .withValuesConvertedBy(new PathConverter());
        quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"),
            "Turns off standard output/error streams logging in console")
            .availableUnless(versionOption)
            .availableUnless(daemonizeOption);
    }

main()

// Elasticsearch
public final int main(String[] args, Terminal terminal) throws Exception {
        if (addShutdownHook()) {
            // 添加关闭hook
            Runtime.getRuntime().addShutdownHook(shutdownHookThread);
        }
        // 这里实际上没有运行任何逻辑
        beforeMain.run();

        try {
            // 根据传入的参数运行程序,抛出所有的异常
            mainWithoutErrorHandling(args, terminal);
        } catch (OptionException e) {
            // print help to stderr on exceptions
            printHelp(terminal, true);
            terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            return ExitCodes.USAGE;
        } catch (UserException e) {
            if (e.exitCode == ExitCodes.USAGE) {
                printHelp(terminal, true);
            }
            if (e.getMessage() != null) {
                terminal.errorPrintln(Terminal.Verbosity.SILENT, "ERROR: " + e.getMessage());
            }
            return e.exitCode;
        }
        return ExitCodes.OK;
    }

根据上面的代码可以看到所mainWithoutErrorHandling方法会根据传入的参数运行程序,我们继续跟进去可以发现如下的一个判断:

    // Command
    void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception {
        final OptionSet options = parser.parse(args);
        // 如果传入的命令是help则正常退出
        if (options.has(helpOption)) {
            printHelp(terminal, false);
            return;
        }
        // 调用子类实现
        execute(terminal, options);
    }
    // EnvironmentAwareCommand
    protected void execute(Terminal terminal, OptionSet options) throws Exception {
        // 解析传入的参数
        putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
        putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
        putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
        // 继续调用子类实现
        execute(terminal, options, createEnv(settings));
    }

这里进入**createEnv()**方法根据名字也可以猜出这个方法是用来准备运行环境的

    public static Environment prepareEnvironment(Settings input, Map<String, String> properties, Path configPath, Supplier<String> defaultNodeName) {
        // 根据homepath 拼装yaml路径
        Path path = environment.configFile().resolve("elasticsearch.yml");
        if (Files.exists(path)) {
            try {
                output.loadFromPath(path);
            } catch (IOException e) {
                throw new SettingsException("Failed to load settings from " + path.toString(), e);
            }
        }
        // 重新初始化配置
        initializeSettings(output, input, properties);
        checkSettingsForTerminalDeprecation(output);
        finalizeSettings(output, defaultNodeName);
        // 创建Enviroment对象
        return new Environment(output.build(), configPath);
    }

    // Environment
    Environment(final Settings settings, final Path configPath, final Path tmpPath) {
        // 创建环境对象,elasticsearch.yaml配置文件
    }

到这里配置解析的过程就结束了,接下来会继续调用子类的execute继续执行es的主逻辑.

    // Elasticsearch
    protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException {
        // 判断是否包含版本查询
        // 校验目录有效性
        // 初始化 param1:是否后台运行 param2:pid文件路径 param3:是否关闭标准输出 param4:环境对象
        try {
            init(daemonize, pidFile, quiet, env);
        } catch (NodeValidationException e) {
            throw new UserException(ExitCodes.CONFIG, e.getMessage());
        }
    }

接下来一起看一下init代码里面的逻辑

    static void init(
            final boolean foreground,
            final Path pidFile,
            final boolean quiet,
            final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException {
        // 设置一个用户线程保证程序的运行
        // 设置一个shutdown hook 在程序退出时关闭线程
        INSTANCE = new Bootstrap();
        // 加载安全配置
        final SecureSettings keystore = loadSecureSettings(initialEnv);
        // 基于现有的以及刚刚加载的安全配置,重新创建enviroment
        final Environment environment = createEnvironment(pidFile, keystore, initialEnv.settings(), initialEnv.configFile());
        // 设置日志输出中node的name
        LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings()));
        try {
            // 初始化日志系统
            LogConfigurator.configure(environment);
        } catch (IOException e) {
            throw new BootstrapException(e);
        }
        // 判断是否需要关闭标准输出
        final boolean closeStandardStreams = (foreground == false) || quiet;
        try {
            if (closeStandardStreams) {
                final Logger rootLogger = LogManager.getRootLogger();
                final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class);
                if (maybeConsoleAppender != null) {
                    Loggers.removeAppender(rootLogger, maybeConsoleAppender);
                }
                closeSystOut();
            }
            // 如果lucence jar包被替换了则无法启动
            checkLucene();
            // 初始化默认的未捕获异常处理器
            Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler());
            // 1.进行初始化设置
            // 2.加载modules并配置守护线程
            // 3.本地环境检查
            // 4.增加shutdown hook 关闭node以及spawner下的modules 关闭日志
            // 5.初始化node和NodeEnvironment 
            // 6.初始化plugins
            // 7.创建Node对象
            INSTANCE.setup(true, environment);
            // 1.开启需要的service
            // 2.开启keepalive线程
            INSTANCE.start();
        } catch (NodeValidationException | RuntimeException e) {
            if (e instanceof CreationException) {
            throw e;
        }
    }

出流程的启动到这里已经结束了,实际上维持es运行的是内部的线程池,我们进入到**INSTANCE.start()**方法来看一下到底有哪些逻辑被start了

        nodeService.getMonitorService().start();
        nodeConnectionsService.start();
        injector.getInstance(GatewayService.class).start();
        transportService.start();
        discovery.start();
        clusterService.start();

es的编程模式就是将需要启动的modules在setup阶段加载出来统一放到injector中.之后再start阶段通过injector获取对象然后调用start方法进行初始化,在初始化的过程中主要是初始化内部数据,创建线程池,启动线程池等操作.

开启keepalive线程,在构建Bootstrap对象的时候就创建了一个线程代码如下所示.

    /** creates a new instance */
    Bootstrap() {
        keepAliveThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    keepAliveLatch.await();
                } catch (InterruptedException e) {
                    // bail out
                }
            }
        }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
        keepAliveThread.setDaemon(false);
        // keep this thread alive (non daemon thread) until we shutdown
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                keepAliveLatch.countDown();
            }
        });
    }

其中**keepAliveLatch.await()**表示在其内部一直等待位置es进程的运行.当主线程运行结束之后keepalive线程是唯一的用户线程,作用就是保持进程运行.在Java程序中,至少要有一个用户线程.当用户线程数为零是退出进程.

节点关闭流程

    // Bootstrap
    private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException {
        // 判断是否需要注入shutdown hook
        if (addShutdownHook) {
            Runtime.getRuntime().addShutdownHook(new Thread() {
                @Override
                public void run() {
                    try {
                        IOUtils.close(node, spawner);
                        LoggerContext context = (LoggerContext) LogManager.getContext(false);
                        Configurator.shutdown(context);
                        // 执行node.awaitClose() 关闭节点
                        if (node != null && node.awaitClose(10, TimeUnit.SECONDS) == false) {
                            throw new IllegalStateException("Node didn't stop within 10 seconds. " +
                                    "Any outstanding requests or tasks might get killed.");
                        }
                    } catch (IOException ex) {
                        throw new ElasticsearchException("failed to stop node", ex);
                    } catch (InterruptedException e) {
                        LogManager.getLogger(Bootstrap.class).warn("Thread got interrupted while waiting for the node to shutdown.");
                        Thread.currentThread().interrupt();
                    }
                }
            });
        }
    }
    // ThreadPool 循环关闭所有线程 调用它shutdownNow()方法
    
                        
                        
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/qq_19663899/article/details/116298660
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。

0 条评论

请先 登录 后评论

官方社群

GO教程

猜你喜欢