<small id='s1iRagy5'></small> <noframes id='haW8fcQb'>

  • <tfoot id='wZxDb'></tfoot>

      <legend id='ryA7c'><style id='EArZsCK8b3'><dir id='brohG65E4'><q id='YWUQzjH'></q></dir></style></legend>
      <i id='S3FKMH9DXO'><tr id='wPXq7aNr8'><dt id='zS5CeaALBu'><q id='TYsl'><span id='T2bcyO'><b id='4zvw96ey'><form id='yKfG3lZ'><ins id='IYTzMb'></ins><ul id='7E8U6'></ul><sub id='TQzKeyIr'></sub></form><legend id='7pGw'></legend><bdo id='vsULQFZrnp'><pre id='vnkupBAQML'><center id='qc3LAl5K'></center></pre></bdo></b><th id='tf0Tz9VGuC'></th></span></q></dt></tr></i><div id='Dmaz1Vt7F'><tfoot id='p8LWv430'></tfoot><dl id='Ue2w0YTx'><fieldset id='9OYnh'></fieldset></dl></div>

          <bdo id='4pAi0Uh'></bdo><ul id='JhPekKBH'></ul>

          1. <li id='kmXFWqgi'></li>
            登陆

            1号娱乐平台官网下载-聊聊elasticsearch的MasterFaultDetection

            admin 2019-05-18 240人围观 ,发现0个评论

            本文首要研究一下elasticsearch的MasterFaultDetection

            FaultDetection

            elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/FaultDetection.java

            /**
            * A base class for {@link MasterFaultDetection} & {@link NodesFaultDetection},
            * making sure both use the same setting.
            */
            public abstract class FaultDetection implements Closeable {

            private static final Logger logger = LogManager.getLogger(FaultDetection.class);

            public static final Setting CONNECT_ON_NETWORK_DISCONNECT_SETTING =
            Setting.boolSetting("discovery.zen.fd.connect_on_network_disconnect", false, Property.NodeScope, Property.Deprecated);
            public static final Setting PING_INTERVAL_SETTING =
            Setting.positiveTimeSetting("discovery.zen.fd.ping_interval", timeValueSeconds(1), Property.NodeScope, Property1号娱乐平台官网下载-聊聊elasticsearch的MasterFaultDetection.Deprecated);
            public static final Setting PING_TIMEOUT_SETTING =
            Setting.timeSetting("discovery.zen.fd.ping_timeout", timeValueSeconds(30), Property.NodeScope, Property.Deprecated);
            public static final Setting PING_RETRIES_SETTING =
            Setting.intSetting("discovery.zen.fd.ping_retries", 3, Property.NodeScope, Property.Deprecated);
            public static final Setting REGISTER_CONNECTION_LISTENER_SETTING =
            Setting.boolSetting("discovery.zen.fd.register_connection_listener", true, Property.NodeScope, Property.Deprecated);

            protected final ThreadPool threadPool;
            protected final ClusterName clusterName;
            protected final TransportService transportService;

            // used mainly for testing, should always be true
            protected final boolean registerConnectionListener;
            protected final FDConnectionListener connectionListener;
            protected final boolean connectOnNetworkDisconnect;

            protected final TimeValue pingInterval;
            protected final TimeValue pingRetryTimeout;
            protected final int pingRetryCount;

            public FaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService, ClusterName clusterName) {
            this.threadPool = threadPool;
            this.transportService = transportService;
            this.clusterName = clusterName;

            this.connectOnNetworkDisconnect = CONNECT_ON_NETWORK_DISCONNECT_SETTING.get(settings);
            this.pingInterval = PING_INTERVAL_SETTING.get(settings);
            this.pingRetryTimeout = PING_TIMEOUT_SETTING.get(settings);
            this.pingRetryCount = PING_RETRIES_SETTING.get(settings);
            this.registerConnectionListener = REGISTER_CONNECTION_LISTENER_SETTING.get(settings);

            this.connectionListener = new FDConnectionListener();
            if (registerConnectionListener) {
            transportService.addConnectionListener(connectionListener);
            }
            }

            @Override
            public void close() {
            transportService.removeConnectionListener(connectionListener);
            }

            /**
            * This method will be called when the {@link org.elasticsearch.transport.TransportService} raised a node disconnected event
            */
            abstract void handleTransportDisconnect(DiscoveryNode node);

            private class FDConnectionListener implements TransportConnectionListener {
            @Override
            public void onNodeDisconnected(DiscoveryNode node) {
            AbstractRunnable runnable = new AbstractRunnable() {
            @Override
            public void onFailure(Exception e) {
            logger.warn("failed to handle transport disconnect for node: {}", node);
            }

            @Override
            protected void doRun() {
            handleTransportDisconnect(node);
            }
            };
            threadPool.generic().execute(runnable);
            }
            }

            }
            • FaultDetection完成了Closeable接口,它界说了FDConnectionListener,其结构器在registerConnectionListener为true的情况下会给transportService增加FDConnectionListener,而close办规律是将FDConnectionListener从transportService中移除;FaultDetection还界说了笼统办法handleTransportDisconnect

            MasterFaultDetection

            elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

            public class MasterFaultDetection extends FaultDetection {

            private static final Logger logger = LogManager.getLogger(MasterFaultDetection.class);

            public static final String MASTER_PING_ACTION_NAME = "internal:discovery/zen/fd/master_ping";

            public interface Listener {

            /** called when pinging the master failed, like a timeout, transport disconnects etc */
            void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason);

            }

            private final MasterSe1号娱乐平台官网下载-聊聊elasticsearch的MasterFaultDetectionrvice masterService;
            private final java.util.function.Supplier clusterStateSupplier;
            private final CopyOnWriteArrayList listeners = new CopyOnWriteArrayList<>();

            private volatile MasterPinger masterPinger;

            private final Object masterNodeMutex = new Object();

            private volatile DiscoveryNode masterNode;

            private volatile int retryCount;

            private final AtomicBoolean notifiedMasterFailure = new AtomicBoolean();

            public MasterFaultDetection(Settings settings, ThreadPool threadPool, TransportService transportService,
            java.util.function.Supplier clusterStateSupplier, MasterService masterService,
            ClusterName clusterName) {
            super(settings, threadPool, transportService, clusterName);
            this.clusterStateSupplier = clusterStateSupplier;
            this.masterService = masterService;

            logger.debug("[master] uses ping_interval [{}], ping_timeout [{}], ping_retries [{}]", pingInterval, pingRetryTimeout,
            pingRetryCount);

            transportService.registerRequestHandler(
            MASTER_PING_ACTION_NAME, MasterPingRequest::new, ThreadPool.Names.SAME, false, false, new MasterPingRequestHandler());
            }

            @Override
            public void close() {
            super.close();
            stop("closing");
            this.listeners.clear();
            }

            @Override
            protected void handleTransportDisconnect(DiscoveryNode node) {
            synchronized (masterNodeMutex) {
            if (!node.equals(this.masterNode)) {
            return;
            }
            if (connectOnNetworkDisconnect) {
            try {
            transportService.connectToNode(node);
            // if all is well, make sure we restart the pinger
            if (masterPinger != null) {
            masterPinger.stop();
            }
            this.masterPinger = new MasterPinger();
            // we use schedule with a 0 time value to run the pinger on the pool as it will run on later
            threadPool.schedule(masterPinger, TimeValue.timeValueMillis(0), ThreadPool.Names.SAME);
            } catch (Exception e) {
            logger.trace("[master] [{}] transport disconnected (with verified connect)", masterNode);
            notifyMasterFailure(masterNode, null, "transport disconnected (with verified connect)");
            }
            } else {
            logger.trace("[master] [{}] transport disconnected", node);
            notifyMasterFailure(node, null, "transport disconnected");
            }
            }
            }

            private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
            if (notifiedMasterFailure.compareAndSet(false, true)) {
            try {
            threadPool.generic().execute(() -> {
            for (Listener listener : listeners) {
            listener.onMasterFailure(masterNode, cause, reason);
            }
            });
            } catch (EsRejectedExecutionException e) {
            logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
            }
            stop("master failure, " + reason);
            }
            }

            //......
            }
            • MasterFaultDetection承继了FaultDetection,其结构器给transportService注册了MasterPingRequestHandler
            • 其handleTransportDisconnect办法在connectOnNetworkDisconnect为true的情况下会对node进行重试,假如重试成功则从头注册MasterPinger的延时使命,假如重试失利或许是connectOnNetworkDisconnect为false的情况下会调用notifyMasterFailure办法
            • notifyMasterFailure办规律会回调MasterFaultDetection.Listener的onMasterFailure办法

            MasterPingRequestHandler

            elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

             private class MasterPingRequestHandler implements TransportRequestHandler {

            @Override
            public void messageReceived(final MasterPingRequest request, final TransportChannel channel, Task task) throws Exception {
            final DiscoveryNodes nodes = clusterStateSupplier.get().nodes();
            // check if we are really the same master as the one we seemed to be think we are
            // this can happen if the master got "kill -9" and then another node started using the same port
            if (!request.masterNode.equals(nodes.getLocalNode())) {
            throw new ThisIsNotTheMasterYouAreLookingForException();
            }

            // ping from nodes of version < 1.4.0 will have the clustername set to null
            if (request.clusterName != null && !request.clusterName.equals(clusterName)) {
            logger.trace("master fault detection ping request is targeted for a different [{}] cluster then us [{}]",
            request.clusterName, clusterName);
            throw new ThisIsNotTheMasterYouAreLookingForException("master fault detection ping request is targeted for a different ["
            + request.clusterName + "] cluster then us [" + clusterName + "]");
            }

            // when we are elected as master or when a node joins, we use a cluster state update thread
            // to incorporate that information in the cluster state. That cluster state is published
            // before we make it available locally. This means that a master ping can come from a node
            // that has already processed the new CS but it is not known locally.
            // Therefore, if we fail we have to check again under a cluster state thread to make sure
            // all processing is finished.
            //

            if (!nodes.isLocalNodeElectedMaster() || !nodes.nodeExists(request.sourceNode)) {
            logger.trace("checking ping from {} under a cluster state thread", request.sourceNode);
            masterService.submitStateUpdateTask("master ping (from: " + request.sourceNode + ")", new ClusterStateUpdateTask() {

            @Override
            public ClusterState execute(ClusterState currentState) throws Exception {
            // if we are no longer master, fail...
            DiscoveryNodes nodes = currentState.nodes();
            if (!nodes.nodeExists(request.sourceNode)) {
            throw new NodeDoesNotExistOnMasterException();
            }
            return currentState;
            }

            @Override
            public void onNoLongerMaster(String source) {
            onFailure(source, new NotMasterException("local node is not master"));
            }

            @Override
            public void onFailure(String source, @Nullable Exception e) {
            if (e == null) {
            e = new ElasticsearchException("unknown error while processing ping");
            }
            try {
            channel.sendResponse(e);
            } catch (IOException inner) {
            inner.addSuppressed(e);
            logger.warn("error while sending ping response", inner);
            }
            }

            @Override
            public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
            try {
            channel.sendResponse(new MasterPingResponseResponse());
            } catch (IOException e) {
            logger.warn("error while sending ping response", e);
            }
            }
            });
            } else {
            // send a response, and note if we are connected to the master or not
            channel.sendResponse(new MasterPingResponseResponse());
            }
            }
            }

            public static class MasterPingRequest extends TransportRequest {

            public DiscoveryNode sourceNode;

            private DiscoveryNode masterNode;
            private ClusterName clusterName;

            public MasterPingRequest() {
            }

            public MasterPingRequest(DiscoveryNode sourceNode, DiscoveryNode masterNode, ClusterName clusterName) {
            this.sourceNode = sourceNode;
            this.masterNode = masterNode;
            this.clusterName = clusterName;
            }

            @Override
            public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            sourceNode = new DiscoveryNode(in);
            masterNode = new DiscoveryNode(in);
            clusterName = new ClusterName(in);
            }

            @Override
            public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            sourceNode.writeTo(out);
            masterNode.writeTo(out);
            clusterName.writeTo(out);
            }
            }

            public static class MasterPingResponseResponse extends TransportResponse {

            public MasterPingResponseResponse() {
            }

            public MasterPingResponseResponse(StreamInput in) throws IOException {
            super(in);
            }
            }
            • MasterPingRequestHandler用于呼应MasterPingRequest恳求,它正在localNode不是master或许sourceNode存在的前提下会履行ClusterStateUpdateTask,不然直接回来MasterPingResponseResponse
            • ClusterStateUpdateTask的execute办法会校验request的sourceNode是否存在,假如不存在则抛出NodeDoesNotExistOnMasterException反常
            • ClusterStateUpdateTask的onNoLongerMaster办法会调用onFailure办法,传递的反常为NotMasterException;onFailure办法判别反常是否为null,为null则创立ElasticsearchException反常,然后回来反常呼应;clusterStateProcessed办规律回来MasterPingResponseResponse

            ZenDiscovery.processNextCommittedClusterState

            elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

            public class ZenDiscovery extends AbstractLifecycleComponent implements Discovery, PingContextProvider, IncomingClusterStateListener {
            //......

            // return true if state has been sent to applier
            boolean processNextCommittedClusterState(String reason) {
            assert Thread.holdsLock(stateMutex);

            final ClusterState newClusterState = pendingStatesQueue.getNextClusterStateToProcess();
            final ClusterState currentState = committedState.get();
            // all pending states have been processed
            if (newClusterState == null) {
            return false;
            }

            assert newClusterState.nodes().getMasterNode() != null : "received a cluster state without a master";
            assert !newC四川旅游lusterState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock()) :
            "received a cluster state with a master block";

            if (currentState.nodes().isLocalNodeElectedMaster() && newClusterState.nodes().isLocalNodeElectedMaster() == false) {
            handleAnotherMaster(currentState, newClusterState.nodes().getMasterNode(), newClusterState.version(),
            "via a new cluster state");
            return false;
            }

            try {
            if (shouldIgnoreOrRejectNewClusterState(logger, currentState, newClusterState)) {
            String message = String.format(
            Locale.ROOT,
            "rejecting cluster state version [%d] uuid [%s] received from [%s]",
            newClusterState.version(),
            newClusterState.stateUUID(),
            newClusterState.nodes().getMasterNodeId()
            );
            throw new IllegalStateException(message);
            }
            } catch (Exception e) {
            try {
            pendingStatesQueue.markAsFailed(newClusterState, e);
            } catch (Exception inner) {
            inner.addSuppressed(e);
            logger.error(() -> new ParameterizedMessage("unexpected exception while failing [{}]", reason), inner);
            }
            return false;
            }

            if (currentState.blocks().hasGlobalBlock(noMasterBlockService.getNoMasterBlock())) {
            // its a fresh update from the master as we transition from a start of not having a master to having one
            logger.debug("got first state from fresh master [{}]", newClusterState.nodes().getMasterNodeId());
            }

            if (currentState == newClusterState) {
            return false;
            }

            committedState.set(newClusterState);

            // update failure detection only after the state has been updated to prevent race condition with handleLeaveRequest
            // and handleNodeFailure as those check the current state to determine whether the failure is to be handled by this node
            if (newClusterState.nodes().isLocalNodeElectedMaster()) {
            // update the set of nodes to ping
            nodesFD.updateNodesAndPing(newClusterState);
            } else {
            // check to see that we monitor the correct master of the cluster
            if (masterFD.masterNode() == null || !masterFD.masterNode().equals(newClusterState.nodes().getMasterNode())) {
            masterFD.restart(newClusterState.nodes().getMasterNode(),
            "new cluster state received and we are monitoring the wrong master [" + masterFD.masterNode() + "]");
            }
            }

            //......

            return true;
            }

            //......
            }
            • ZenDiscovery的processNextCommittedClusterState办法在当时node不是master的时分会在masterFD.masterNode()为null或许masterFD.masterNode()与newClusterState.nodes().getMasterNode()不一起履行masterFD.restart办法

            MasterFaultDetection.restart

            elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

            public class MasterFaultDetection extends FaultDetection {

            //......

            public void restart(DiscoveryNode masterNode, String reason) {
            synchronized (masterNodeMutex) {
            if (logger.isDebugEnabled()) {
            logger.debug("[master] restarting fault detection against master [{}], reason [{}]", masterNode, reason);
            }
            innerStop();
            innerStart(masterNode);
            }
            }

            private void innerStart(final DiscoveryNode masterNode) {
            this.masterNode = masterNode;
            this.retryCount = 0;
            this.notifiedMasterFailure.set(false);
            if (masterPinger != null) {
            masterPinger.stop();
            }
            this.masterPinger = new MasterPinger();

            // we start pinging slightly later to allow the chosen master to complete it's own master election
            threadPool.schedule(masterPinger, pingInterval, ThreadPool.Names.SAME);
            }

            private void innerStop() {
            // also will stop the next ping schedule
            this.retryCount = 0;
            if (masterPinger != null) {
            masterPinger.stop();
            masterPinger = null;
            }
            this.masterNode = null;
            }

            //......
            }
            • MasterFaultDetection的restart办法内部先履行innerStop,然后再履行innerStart;innerStop首要是履行masterPinger.stop()并设置masterPinger及masterNode为null;innerStart办规律创立并注册MasterPinger的延时使命,延时pingInterval履行

            MasterPinger

            elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/MasterFaultDetection.java

             private class MasterPinger implements Runnable {

            private volatile boolean running = true;

            public void stop() {
            this.running = false;
            }

            @Override
            public void run() {
            if (!running) {
            // return and don't spawn...
            return;
            }
            final DiscoveryNode masterToPing = masterNode;
            if (masterToPing == null) {
            // master is null, should not happen, but we are still running, so reschedule
            threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
            return;
            }

            final MasterPingRequest request = new MasterPingRequest(
            clusterStateSupplier.get().nodes().getLocalNode(), masterToPing, clusterName);
            final TransportRequestOptions options = TransportRequestOptions.builder().withType(TransportRequestOptions.Type.PING)
            .withTimeout(pingRetryTimeout).build();
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options,
            new TransportResponseHandler() {
            @Override
            public MasterPingResponseResponse read(StreamInput in) throws IOException {
            return new MasterPingResponseResponse(in);
            }

            @Override
            public void handleResponse(MasterPingResponseResponse response) {
            if (!running) {
            return;
            }
            // reset the counter, we got a good result
            MasterFaultDetection.this.retryCount = 0;
            // check if the master node did not get switched on us..., if it did, we simply return with no reschedule
            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
            // we don't stop on disconnection from master, we keep pinging it
            threadPool.schedule(MasterPinger.this, pingInterval, ThreadPool.Names.SAME);
            }
            }

            @Override
            public void handleException(TransportException exp) {
            if (!running) {
            return;
            }
            synchronized (masterNodeMutex) {
            // check if the master node did not get switched on us...
            if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
            if (exp instanceof ConnectTransportException || exp.getCause() instanceof ConnectTransportException) {
            handleTransportDisconnect(masterToPing);
            return;
            } else if (exp.getCause() instanceof NotMasterException) {
            logger.debug("[master] pinging a master {} that is no longer a master", masterNode);
            notifyMasterFailure(masterToPing, exp, "no longer master");
            retu1号娱乐平台官网下载-聊聊elasticsearch的MasterFaultDetectionrn;
            } else if (exp.getCause() instanceof ThisIsNotTheMasterYouAreLookingForException) {
            logger.debug("[master] pinging a master {} that is not the master", masterNode);
            notifyMasterFailure(masterToPing, exp,"not master");
            return;
            } else if (exp.getCause() instanceof NodeDoesNotExistOnMasterException) {
            logger.debug("[master] pinging a master {} but we do not exists on it, act as if its master failure"
            , masterNode);
            notifyMasterFailure(masterToPing, exp,"do not exists on master, act as master failure");
            return;
            }

            int retryCount = ++MasterFaultDetection.this.retryCount;
            logger.trace(() -> new ParameterizedMessage(
            "[master] failed to ping [{}], retry [{}] out of [{}]",
            masterNode, retryCount, pingRetryCount), exp);
            if (retryCount >= pingRetryCount) {
            logger.debug("[master] failed to ping [{}], tried [{}] times, each with maximum [{}] timeout",
            masterNode, pingRetryCount, pingRetryTimeout);
            // not good, failure
            notifyMasterFailure(masterToPing, null, "failed to ping, tried [" + pingRetryCount
            + "] times, each with maximum [" + pingRetryTimeout + "] timeout");
            } else {
            // resend the request, not reschedule, rely on send timeout
            transportService.sendRequest(masterToPing, MASTER_PING_ACTION_NAME, request, options, this);
            }
            }
            }
            }

            @Override
            public String executor() {
            return ThreadPool.Names.SAME;
            }
            }
            );
            }
            }

            private void notifyMasterFailure(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
            if (notifiedMasterFailure.compareAndSet(false, true)) {
            try {
            threadPool.generic().execute(() -> {
            for (Listener listener : listeners) {
            listener.onMasterFailure(masterNode, cause, reason);
            }
            });
            } catch (EsRejectedExecutionException e) {
            logger.error("master failure notification was rejected, it's highly likely the node is shutting down", e);
            }
            stop("master failure, " + reason);
            }
            }
            • MasterPinger的run办法首要判别masterToPing是否为null,假如为null则在注册1号娱乐平台官网下载-聊聊elasticsearch的MasterFaultDetectionMasterPinger的延时使命;假如不为null则发送MasterPingRequest恳求给masterToPing
            • TransportResponseHandler的handleResponse办法会清空MasterFaultDetection.this.retryCount,然后判别masterNode是否改变,没有改变则持续注册MasterPinger的延时使命
            • TransportResponseHandler的handleException办法会在masterNode没有改变的前提下对反常进行处理,假如是ConnectTransportException则履行handleTransportDisconnect办法,假如是NotMasterException、ThisIsNotTheMasterYouAreLookingForException、NodeDoesNotExistOnMasterException则履行notifyMasterFailure办法,其他反常则进行重试
            • 重试时先递加MasterFaultDetection.this.retryCount,假如重试次数大于等于pingRetryCount则直接履行notifyMasterFailure办法,不然进行重试发送MasterPingRequest恳求
            • notifyMasterFailure办规律回调MasterFaultDetection.Listener的onMasterFailure办法

            ZenDiscovery.MasterNodeFailureListener

            elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/discovery/zen/ZenDiscovery.java

             private class MasterNodeFailureListener implements MasterFaultDetection.Listener {

            @Override
            public void onMasterFailure(DiscoveryNode masterNode, Throwable cause, String reason) {
            handleMasterGone(masterNode, cause, reason);
            }
            }

            private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
            if (lifecycleState() != Lifecycle.State.STARTED) {
            // not started, ignore a master failure
            return;
            }
            if (localNodeMaster()) {
            // we might get this on both a master telling us shutting down, and then the disconnect failure
            return;
            }

            logger.info(() -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);

            synchronized (stateMutex) {
            if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
            // flush any pending cluster states from old master, so it will not be set as master again
            pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
            rejoin("master left (reason = " + reason + ")");
            }
            }
            }

            protected void rejoin(String reason) {
            assert Thread.holdsLock(stateMutex);
            ClusterState clusterState = committedState.get();

            logger.warn("{}, current nodes: {}", reason, clusterState.nodes());
            nodesFD.stop();
            masterFD.stop(reason);

            // TODO: do we want to force a new thread if we actively removed the master? this is to give a full pinging cycle
            // before a decision is made.
            joinThreadControl.startNewThreadIfNotRunning();

            if (clusterState.nodes().getMasterNodeId() != null) {
            // remove block if it already exists before adding new one
            assert clusterState.blocks().hasGlobalBlockWithId(noMasterBlockService.getNoMasterBlock().id()) == false :
            "NO_MASTER_BLOCK should only be added by ZenDiscovery";
            ClusterBlocks clusterBlocks = ClusterBlocks.builder().blocks(clusterState.blocks())
            .addGlobalBlock(noMasterBlockService.getNoMasterBlock())
            .build();

            DiscoveryNodes discoveryNodes = new DiscoveryNodes.Builder(clusterState.nodes()).masterNodeId(null).build();
            clusterState = ClusterState.builder(clusterState)
            .blocks(clusterBlocks)
            .nodes(discoveryNodes)
            .build();

            committedState.set(clusterState);
            clusterApplier.onNewClusterState(reason, this::clusterState, (source, e) -> {}); // don't wait for state to be applied
            }
            }

            private class JoinThreadControl {

            private final AtomicBoolean running = new AtomicBoolean(false);
            private final AtomicReference currentJoinThread = new AtomicReference<>();

            /** returns true if join thread control is started and there is currently an active join thread */
            public boolean joinThreadActive() {
            Thread currentThread = currentJoinThread.get();
            return running.get() && currentThread != null && currentThread.isAlive();
            }

            /** returns true if join thread control is started and the supplied thread is the currently active joinThread */
            public boolean joinThreadActive(Thread joinThread) {
            return running.get() && joinThread.equals(currentJoinThread.get());
            }

            /** cleans any running joining thread and calls {@link #rejoin} */
            public void stopRunningThreadAndRejoin(String reason) {
            assert Thread.holdsLock(stateMutex);
            currentJoinThread.set(null);
            rejoin(reason);
            }

            /** starts a new joining thread if there is no currently active one and join thread controlling is started */
            public void startNewThreadIfNotRunning() {
            assert Thread.holdsLock(stateMutex);
            if (joinThreadActive()) {
            return;
            }
            threadPool.generic().execute(new Runnable() {
            @Override
            public void run() {
            Thread currentThread = Thread.currentThread();
            if (!currentJoinThread.compareAndSet(null, currentThread)) {
            return;
            }
            while (running.get() && joinThreadActive(currentThread)) {
            try {
            innerJoinCluster();
            return;
            } catch (Exception e) {
            logger.error("unexpected error while joining cluster, trying again", e);
            // Because we catch any exception here, we want to know in
            // tests if an uncaught exception got to this point and the test infra uncaught exception
            // leak detection can catch this. In practise no uncaught exception should leak
            assert ExceptionsHelper.reThrowIfNotNull(e);
            }
            }
            // cleaning the current thread from currentJoinThread is done by explicit calls.
            }
            });
            }

            /**
            * marks the given joinThread as completed and makes sure another thread is running (starting one if needed)
            * If the given thread is not the currently running join thread, the command is ignored.
            */
            public void markThreadAsDoneAndStartNew(Thread joinThread) {
            assert Thread.holdsLock(stateMutex);
            if (!markThreadAsDone(joinThread)) {
            return;
            }
            startNewThreadIfNotRunning();
            }

            /** marks the given joinThread as completed. Returns false if the supplied thread is not the currently active join thread */
            public boolean markThreadAsDone(Thread joinThread) {
            assert Thread.holdsLock(stateMutex);
            return currentJoinThread.compareAndSet(joinThread, null);
            }

            public void stop() {
            running.set(false);
            Thread joinThread = currentJoinThread.getAndSet(null);
            if (joinThread != null) {
            joinThread.interrupt();
            }
            }

            public void start() {
            running.set(true);
            }

            }
            • ZenDiscovery的MasterNodeFailureListener完成了MasterFaultDetection.Listener接口,其onMasterFailure办法履行的是handleMasterGone办法;handleMasterGone办法首要是履行pendingStatesQueue.failAllStatesAndClear,然后进行rejoin
            • rejoin办法首要履行nodesFD.stop()及masterFD.stop(reason),然后触发joinThreadControl.startNewThreadIfNotRunning(),最终结构新的clusterState,履行clusterApplier.onNewClusterState
            • joinThreadControl.startNewThreadIfNotRunning()办法首要是履行innerJoinCluster办法

            小结

            • FaultDetection完成了Closeable接口,它界说了FDConnectionListener,其结构器在registerConnectionListener为true的情况下会给transportService增加FDConnectionListener,而close办规律是将FDConnectionListener从transportService中移除;FaultDetection还界说了笼统办法handleTransportDisconnect
            • MasterFaultDetection承继了FaultDetection,其结构器给transportService注册了MasterPingRequestHandler;其handleTransportDisconnect办法在connectOnNetworkDisconnect为true的情况下会对node进行重试,假如重试成功则从头注册MasterPinger的延时使命,假如重试失利或许是connectOnNetworkDisconnect为false的情况下会调用notifyMasterFailure办法;notifyMasterFailure办规律会回调MasterFaultDetection.Listener的onMasterFailure办法
            • ZenDiscovery的MasterNodeFailureListener完成了MasterFaultDetection.Listener接口,其onMasterFailure办法履行的是handleMasterGone办法;handleMasterGone办法首要是履行pendingStatesQueue.failAllStatesAndClear,然后进行rejoin
            • ZenDiscovery的processNextCommittedClusterState办法在当时node不是master的时分会在masterFD.masterNode()为null或许masterFD.masterNode()与newClusterState.nodes().getMasterNode()不一起履行masterFD.restart办法
            • MasterFaultDetection的restart办法内部先履行innerStop,然后再履行innerStart;innerStop首要是履行masterPinger.stop()并设置masterPinger及masterNode为null;innerStart办规律创立并注册MasterPinger的延时使命,延时pingInterval履行
            • MasterPinger的run办法首要判别masterToPing是否为null,假如为null则在注册MasterPinger的延时使命;假如不为null则发送MasterPingRequest恳求给masterToPing;恳求成功时会清空MasterFaultDetection.this.retryCount,然后判别masterNode是否改变,没有改变则持续注册MasterPinger的延时使命;恳求失利则依据反常做不同处理,比方履行handleTransportDisconnect办法,或许履行notifyMasterFailure办法,或许则进行重试

            doc

            • Cluster fault detection
            请关注微信公众号
            微信二维码
            不容错过
            Powered By Z-BlogPHP