先说下结论,只要集群中的工作节点过半,有候选的master节点,挂掉的节点中不同时包含索引的主分片和副分片,那么ES是可以做到让业务无感知的进行主副分片切换的。

蓝胖子会先讲解下ES集群写入文档的原理,并对异常情况进行分析,最后来模拟集群写入过程中节点宕机的情况来对这个问题展开讨论。

主副分片的写入流程

之前我在Elasticsearch 如何保证写入过程中不丢失数据的 有提到ES 通过translog 保证了segment在写入完成后即使会在内存停留一段时间也不会因为宕机而丢失数据。但是没有提到ES在写入时,副本和主节点之间的关系,现在把这部分补充完整。

如下图所示,有3个es节点,es03是整个集群的master节点,同时也承担数据节点的角色,es01,es02都是数据节点,同时也可以成为master节点候选者。

es client客户端发送插入文档的请求,默认是随机选取集群中的一个节点进行发送,假设请求发送给了es02,由于插入和修改操作只能由主分片进行,此时主分片又是es03节点,所以es02这个时候就充当了协调节点的角色,对客户端的请求进行了转发,转发给了es03

正常情况下es03 会在主分片对数据进行写入,写入成功后,再将插入请求转发到副本节点进行复制,等待副本节点回应后(无论成功还是失败都会回应)会将此次写入数据的结果返回给协调节点es03,es03再将结果返回给客户端。

分片写入过程中的异常处理流程

上面是一个正常的流程,现在来看一下一个异常的情况,如下图所示,在协调节点es02转发请求给es03时,es03 在处理分片写入过程中宕机了,这个时候,客户端的此次写入会失败掉吗?

es03 节点上的是主分片,这里有必要对es03节点宕机的时机进行分析,

1,es03节点在主分片写入前宕机。

2,es03 节点在主分片写入成功后,还没来得及向副分片写入数据就宕机。

3,es03节点在副分片也写入成功后,就宕机。

以上是主分片的3种宕机的情况。

接着,回到写入流程上,es03节点宕机了,es02的http 转发请求将会失败,此时es02节点在收到失败后,将会进行重试,并且由于此时是主分片,且是master节点宕机,所以,es02节点会等待整个集群重新选出主分片和主节点后,再会进行重试

需要注意的是,如果es03节点宕机的时机是主分片写入前,或者是还没来得及向副分片进行复制就宕机,那么重试不会有任何问题,新的主分片将会拥有写入失败的数据。如下图所示,

如果es03节点在宕机前已经完成了副分片的复制,es01节点已经拥有了这条插入的数据,那么协调节点es02的重试会导致多插入一条数据吗?

其实是不会的,在Es节点内部,每个文档都会有一个_idversion_id 标记唯一一个文档,version 用于版本更新,ES的更新是基于乐观锁机制,并发更新时有可能返回致版本错误,需要业务方进行重试,更新成功后文档的version字段会进行加1。

在协调节点转发插入请求前,便会生成_idversion 字段,所以如果es03节点已经将插入的数据复制到了副分片,那么es02节点在进行重试发请求时,请求到达es01节点,会在文档中发现相同的_idversion的文档,则会认为数据已经插入了,忽略掉这次重试的插入请求。ES集群会正常的进行插入数据,后续客户端的插入请求都将被转发到es01节点的主分片进行插入。客户端除了直接请求es03节点,不会察觉到es03节点挂了。

上面是在文档写入过程中节点宕机的情况,除以以外,如果es03节点如果没有在处理请求就宕机了,那么集群会重新选主节点和主分片,如果选举过程还没结束,此时客户端就发送来插入请求,那么会等待选举结束后,es02才会继续转发请求到新主分片节点。

所以,你可以看到,Es节点可以通过重试和等待主分片选举实现让业务无感知的进行主副分片的切换。

压测集群模拟节点宕机

下面我用docker compose 启动一个3节点集群,然后对集群进行并发插入,接着模拟节点 宕机,主动kill掉一个节点,来看看最后插入的数据是否和并发插入数据是吻合的,并且插入过程没有报错。

启动一个es集群,

version: '2.2'  
services:  
  es01:  
    image: docker.elastic.co/elasticsearch/elasticsearch:7.14.2  
    container_name: es01  
    environment:  
      - node.name=es01  
      - cluster.name=es-docker-cluster  
      - discovery.seed_hosts=es02,es03  
      - cluster.initial_master_nodes=es01,es02,es03  
      - bootstrap.memory_lock=true  
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"  
    ulimits:  
      memlock:  
        soft: -1  
        hard: -1  
    ports:  
      - 9200:9200  
  es02:  
    image: docker.elastic.co/elasticsearch/elasticsearch:7.14.2  
    container_name: es02  
    environment:  
      - node.name=es02  
      - cluster.name=es-docker-cluster  
      - discovery.seed_hosts=es01,es03  
      - cluster.initial_master_nodes=es01,es02,es03  
      - bootstrap.memory_lock=true  
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"  
    ulimits:  
      memlock:  
        soft: -1  
        hard: -1  
  
    ports:  
      - 9201:9200  
  es03:  
    image: docker.elastic.co/elasticsearch/elasticsearch:7.14.2  
    container_name: es03  
    environment:  
      - node.name=es03  
      - cluster.name=es-docker-cluster  
      - discovery.seed_hosts=es01,es02  
      - cluster.initial_master_nodes=es01,es02,es03  
      - bootstrap.memory_lock=true  
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"  
    ulimits:  
      memlock:  
        soft: -1  
        hard: -1  
    ports:  
      - 9202:9200

启动之后,创建了一个名字叫做cd的索引,

PUT /cd
{
  "mappings": {
    "properties": {
      "info":{
        "type": "text"
      },
      "age":{
      "type":"integer"
    },
      "email":{
        "type": "keyword",
        "index": false
      },
      "name":{
        "type": "object",
        "properties": {
          "firstName": {
            "type": "keyword"
          },
          "lastName": {
            "type": "keyword"
          }
        }
      }
    }
  }  
}

通过kibana进行查看,目前的主节点es03

接着,我用golang写了一个程序总共发出10万的插入请求,100并发进行发出,代码如下,

package main  
  
import (  
    "fmt"  
    "github.com/google/uuid"   
     "io/ioutil"    
     "net/http"   
      "strings"   
       "sync")  
  
func main() {  
    wg := sync.WaitGroup{}  
    for i := 1; i <= 100000; i++ {  
       if i%100 == 0 {  
          wg.Wait()  
       }  
       wg.Add(1)  
       go func() {  
          defer wg.Done()  
          insert()  
       }()  
    }  
    wg.Wait()  
}  
  
func insert() {  
    url := "http://localhost:9201/cd/_doc"  
    method := "POST"  
  
    id := uuid.New().String()  
    payload := strings.NewReader(`{  
  "info":"` + id + `",  
  "email":"12345@136.com",  "name":{    "firstName":"张",  
    "lastName":"四"  
  },  "age":18}`)  
  
    client := &http.Client{}  
    req, err := http.NewRequest(method, url, payload)  
  
    if err != nil {  
       fmt.Println(err)  
       return  
    }  
    req.Header.Add("Content-Type", "application/json")  
  
    res, err := client.Do(req)  
    if err != nil {  
       fmt.Println(err, "错误了", id)  
       return  
    }  
    defer res.Body.Close()  
  
    body, err := ioutil.ReadAll(res.Body)  
    if err != nil {  
       fmt.Println(err)  
       return  
    }  
    if res.StatusCode != 201 {  
       fmt.Println(string(body), res.StatusCode, id)  
    }  
}

程序启动后,就开始并发的往集群中进行插入,此时我手动进入es03容器内部,然后kill掉了es进程。接着,此时索引的主副分片发生了变化,如下图所示,索引的主分片变到了节点es01上,而集群的master节点变成了es02。

整个程序此时还在进行,并未有任何报错。

此时我又手动重启了挂掉的es03,让其自动进行恢复recovery ,节点状态变化如下,可以看到es03已经变成副分片了。

最后,等待并发程序结束后,我向集群中对插入的数据行数进行查询,的确是10万条数据,说明数据没有发生丢失,并且,整个过程,客户端也没有报错。

足以说明,ES的主副分片的确对业务是无感知的了。的确是妙啊。对比起mysql的可靠性优先的主备切换方式,数据库会有小段的时间不可写,重试机制由ES帮我们做了。