Skip to content

负载均衡方案 #173

@melin

Description

@melin

image

Server 注册服务host:port

import com.gitee.melin.bee.util.NetUtils;
import com.gitee.melin.bee.util.ThreadUtils;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.core.env.Environment;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Service
public class RedisDiscoveryClusterMember implements InitializingBean, DisposableBean {

    private static final Logger LOG = LoggerFactory.getLogger(RedisDiscoveryClusterMember.class);

    private final AtomicBoolean running = new AtomicBoolean(true);

    @Autowired
    private RedisTemplate<String, String> redisTemplate;

    @Autowired
    private Environment environment;

    @Value("${server.port}")
    private int serverPort;

    @Override
    public void destroy() throws Exception {
        running.getAndSet(false);
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        final String profile;
        if (ArrayUtils.contains(environment.getActiveProfiles(), "dev")) {
            profile = "dev";
        } else if (ArrayUtils.contains(environment.getActiveProfiles(), "test")) {
            profile = "test";
        } else if (ArrayUtils.contains(environment.getActiveProfiles(), "production")) {
            profile = "production";
        } else {
            throw new IllegalArgumentException("profile 值不正确");
        }

        final String hostName = NetUtils.getLocalHost() + ":" + serverPort;
        final String key = "superior-member-" + profile + "_" + hostName;
        redisTemplate.opsForValue().set(key, hostName, 2, TimeUnit.SECONDS);

        ExecutorService executorService = ThreadUtils.newDaemonSingleThreadExecutor("update-jobserver-node");
        executorService.submit(() -> {
            while (running.get()) {
                try {
                    redisTemplate.opsForValue().set(key, hostName, 2, TimeUnit.SECONDS);

                    TimeUnit.MILLISECONDS.sleep(100);
                } catch (Throwable e) {
                    LOG.error("更新集群成员信息失败: " + e.getMessage());
                }
            }
        });
    }
}

** Client 负责均衡相关代码**

import com.netflix.loadbalancer.Server;
import com.netflix.loadbalancer.ServerList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

@Service
public class DiscoveryServerList implements ServerList<Server> {

    private static final Logger LOG = LoggerFactory.getLogger(DiscoveryServerList.class);

    @Autowired
    private RedisDiscoveryClusterMember clusterMember;

    @Override
    public List<Server> getInitialListOfServers() {
        return getServers();
    }

    @Override
    public List<Server> getUpdatedListOfServers() {
        return getServers();
    }

    private List<Server> getServers() {
        List<String> uris = clusterMember.getSuperiorUris();

        if (uris.size() > 0) {
            return uris.stream().map(Server::new).collect(Collectors.toList());
        } else {
            return Collections.EMPTY_LIST;
        }
    }
}
import com.netflix.loadbalancer.IPing;
import com.netflix.loadbalancer.Server;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;

import java.io.IOException;

public class SuperiorHealthCheck implements IPing {

    private static final Logger LOGGER = LoggerFactory.getLogger(SuperiorHealthCheck.class);

    private static final RequestConfig requestConfig = RequestConfig.custom().
            setConnectTimeout(3 * 1000).
            setConnectionRequestTimeout(3 * 1000).
            setSocketTimeout(3 * 1000).build();

    private String pingAppendString = "";

    private boolean isSecure = false;

    private final String expectedContent = null;

    public SuperiorHealthCheck() {}

    public SuperiorHealthCheck(boolean isSecure, String pingAppendString) {
        this.isSecure = isSecure;
        this.pingAppendString = (pingAppendString != null) ? pingAppendString : "";
    }

    public void setPingAppendString(String pingAppendString) {
        this.pingAppendString = (pingAppendString != null) ? pingAppendString : "";
    }

    public String getPingAppendString() {
        return pingAppendString;
    }

    public String getExpectedContent() {
        return expectedContent;
    }

    @Override
    public boolean isAlive(Server server) {
        String uri = "";
        if (isSecure) {
            uri = "https://";
        } else {
            uri = "http://";
        }
        uri += server.getId();
        uri += getPingAppendString();

        boolean isAlive = false;
        HttpUriRequest getRequest = new HttpGet(uri);
        StopWatch watch = new StopWatch();
        watch.start();
        try (CloseableHttpClient httpClient = HttpClientBuilder.create()
                .setDefaultRequestConfig(requestConfig).build()) {

            HttpResponse response = httpClient.execute(getRequest);
            String content = EntityUtils.toString(response.getEntity());
            isAlive = (response.getStatusLine().getStatusCode() == 200);
            if (getExpectedContent() != null) {
                LOGGER.debug("content:" + content);
                if (content == null) {
                    isAlive = false;
                } else {
                    if (content.equals(getExpectedContent())) {
                        isAlive = true;
                    } else {
                        isAlive = false;
                    }
                }
            }
        } catch (IOException e) {
            watch.stop();
            LOGGER.debug("ping server failure: {}, times: {}ms", uri, watch.getTotalTimeMillis());
        } finally {
            getRequest.abort();
        }

        return isAlive;
    }
}
import com.github.lianjiatech.retrofit.spring.boot.core.ServiceInstanceChooser;
import io.github.melin.jobserver.spark.api.JobServerException;
import com.netflix.loadbalancer.*;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.net.URI;

@Service
public class SuperiorServiceInstanceChooser implements ServiceInstanceChooser, InitializingBean {

    private static final long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;

    private static final int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 3 * 1000; // msecs;

    private ILoadBalancer loadBalancer;

    @Autowired
    private DiscoveryServerList discoveryServerList;

    @Override
    public void afterPropertiesSet() throws Exception {
        SuperiorHealthCheck healthCheck = new SuperiorHealthCheck();
        healthCheck.setPingAppendString("/ok");
        loadBalancer = LoadBalancerBuilder.newBuilder()
                .withRule(new RoundRobinRule())
                .withPing(healthCheck)
                .withDynamicServerList(discoveryServerList)
                .withServerListUpdater(new PollingServerListUpdater(LISTOFSERVERS_CACHE_UPDATE_DELAY,
                        LISTOFSERVERS_CACHE_REPEAT_INTERVAL))
                .buildDynamicServerListLoadBalancerWithUpdater();
    }

    @Override
    public URI choose(String serviceId) {
        Server server = loadBalancer.chooseServer("sparkjobserver");
        if (server != null) {
            return URI.create(server.getScheme() + "://" + server.getHostPort());
        } else {
            throw new JobServerException("不能自动探查到 superior server");
        }
    }
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions