RPC 实战与核心原理分析_rpcx-程序员宅基地

技术标签: Golang  rpc  网络  网络协议  TCP/IP协议  

RPC 实战与核心原理分析

  • RPCX是一个分布式的Go语言的 RPC 框架,支持Zookepper、etcd、consul多种服务发现方式,多种服务路由方式,

例子

服务端

package main

import (
"flag"

example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)

var addr = flag.String("addr", "localhost:8972", "server address")

func main() {
    
flag.Parse()

s := server.NewServer()
// s.RegisterName("Arith", new(example.Arith), "")
s.Register(new(example.Arith), "")
s.Serve("tcp", *addr)
}

客户端

package main

import (
	"context"
	"flag"
	"log"
	"time"

	example "github.com/rpcxio/rpcx-examples"
	"github.com/smallnest/rpcx/client"
)

var (
	addr = flag.String("addr", "localhost:8972", "server address")
)

func main() {
    
	flag.Parse()

	d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
	xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
	defer xclient.Close()

	args := &example.Args{
    
		A: 10,
		B: 20,
	}

	for {
    
		reply := &example.Reply{
    }
		err := xclient.Call(context.Background(), "Mul", args, reply)
		if err != nil {
    
			log.Fatalf("failed to call: %v", err)
		}

		log.Printf("%d * %d = %d", args.A, args.B, reply.C)
		time.Sleep(1e9)
	}

}

这是一个简单的例子,使用RPCX实现了服务端和客户端。

RPCX实例分析

如何发布自己的服务

在上面的例子中,我们只需要用service.NewServer()→ RegisterName → Serve 就可以开始一个rpc服务端。我们也可以使用rpcx的插件功能,将服务端的地址注册到持久化存储当中,这样client 就可以使用服务发现的功能来对服务端进行rpc调用。

使用RegistryPlugin的方式进行服务注册时,rpcx 会在服务启动前将服务注册到注册中心中。

// RegisterPlugin is .
// 注册中心接口定义,如果你要实现自己的注册中心,就需要实现这个RegisterPlugin接口
RegisterPlugin interface {
    
	Register(name string, rcvr interface{
    }, metadata string) error
	Unregister(name string) error
}

// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (s *Server) RegisterName(name string, rcvr interface{
    }, metadata string) error {
    
	_, err := s.register(rcvr, name, true)
	if err != nil {
    
		return err
	}
	if s.Plugins == nil {
    
		s.Plugins = &pluginContainer{
    }
	}
    // RegistryPlugin 执行,进行服务注册
	return s.Plugins.DoRegister(name, rcvr, metadata)
}
  • rpcx 支持 ZooKeeper,Etcd,mDNS,Consul,Redis,DNS等方式进行服务注册。例子可以参考:https://github.com/rpcxio/rpcx-examples/tree/master/registry。

如何调用他人的远程服务

我们知道,一个RPC的调用会涉及到两端,一个是Client,一个是Server端。两端的基本工作如下

Server 端工作

  • 监听端口
  • 响应连接请求
  • 接收数据包
  • 解析数据包
  • 调用相应方法
  • 组装请求处理结果数据包
  • 发送结果数据包
  • 序列化协议
服务端如何接收客户端请求
serveListener
  • 服务端在监听端口之后,会有一个serveListener 函数,
  • 这个函数一个for循环 Accept 客户端过来的连接,
  • 最后会调用 serveConn 来处理这个请求
// serveListener accepts incoming connections on the Listener ln,
// creating a new service goroutine for each.
// The service goroutines read requests and then call services to reply to them.
func (s *Server) serveListener(ln net.Listener) error {
    
	var tempDelay time.Duration

	s.mu.Lock()
	s.ln = ln
	s.mu.Unlock()

	for {
    
        // 接受客户端过来的连接
		conn, e := ln.Accept()         
        // 省略代码........   
        if tc, ok := conn.(*net.TCPConn); ok {
    
			period := s.options["TCPKeepAlivePeriod"]
			if period != nil {
    
				tc.SetKeepAlive(true)
				tc.SetKeepAlivePeriod(period.(time.Duration))
				tc.SetLinger(10)
			}
		}
        // 处理请求之前的操作
		conn, ok := s.Plugins.DoPostConnAccept(conn)
		if !ok {
    
			conn.Close()
			continue
		}
        // 保存到这个请求
		s.mu.Lock()
		s.activeConn[conn] = struct{
    }{
    }
		s.mu.Unlock()

		if share.Trace {
    
			log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())
		}
        // 处理连接请求
		go s.serveConn(conn)
	}
}
serveConn

serveConn 是读取和处理用户过来的请求,如果开启了异步写,服务端会异步response给client,
其中 readRequest 从网络连接中读取到客户端过来的数据,这里会包含解码解压缩操作
processOneRequest 就是真正处理客户端请求的函数了,这里会通过反射调用服务的方法

func (s *Server) serveConn(conn net.Conn) {
    

    // 省略代码........

	r := bufio.NewReaderSize(conn, ReaderBuffsize)
    // 开启异步response给client
	var writeCh chan *[]byte
	if s.AsyncWrite {
    
		writeCh = make(chan *[]byte, 1)
		defer close(writeCh)
		go s.serveAsyncWrite(conn, writeCh)
	}

	// read requests and handle it
    // 读取和处理请求
	for {
    
        // 省略代码......
        // 读取请求
		req, err := s.readRequest(ctx, r)
	    // 处理请求
		if s.pool != nil {
    
			s.pool.Submit(func() {
    
				s.processOneRequest(ctx, req, conn, writeCh)
			})
		} else {
    
			go s.processOneRequest(ctx, req, conn, writeCh)
		}
	}
}
服务端解析客户端请求
  • 协议详解见:RPCX协议详解
  • 在Decode中会进行协议解析
func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Message, err error) {
    
    // 处理request之前的操作
	err = s.Plugins.DoPreReadRequest(ctx)
	if err != nil {
    
		return nil, err
	}
	// pool req?
	req = protocol.GetPooledMsg()
    // 解码解压缩
	err = req.Decode(r)
	if err == io.EOF {
    
		return req, err
	}
    // 处理request之前的操作
	perr := s.Plugins.DoPostReadRequest(ctx, req, err)
	if err == nil {
    
		err = perr
	}
	return req, err
}

如果配置了压缩方式,也会对服务的数据进行解压缩

func (m *Message) Decode(r io.Reader) error {
    

    // 前面都是协议的解析

    // 如果配置了压缩方式,对服务的数据进行解压缩到 Payload 中
	if m.CompressType() != None {
    
		compressor := Compressors[m.CompressType()]
		if compressor == nil {
    
			return ErrUnsupportedCompressor
		}
		m.Payload, err = compressor.Unzip(m.Payload)
		if err != nil {
    
			return err
		}
	}

	return err
}
服务端调用相应方法和返回结果给客户端端

接受到用户的请求之后,就是调用服务端对应的方法了,在用户消息Decode 中,服务端已经把ServicePath、ServiceMethod 解析出来了,剩下的就是只要找到对应的方法就可以执行

	m.ServicePath = util.SliceByteToString(data[n:nEnd])
	m.ServiceMethod = util.SliceByteToString(data[n:nEnd])

前面在 servConn 中的函数 processOneRequest 会处理Client的请求,我们来看下,这里是怎么处理的,

  • 先解析出 Payload 中的 请求参数,
  • 然后用参数去调用 service 这个函数service := s.serviceMap[serviceName] 就是 服务注册过去的业务函数。
  • 然后用sendResponse 发送responseclient
func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
    
	serviceName := req.ServicePath
	methodName := req.ServiceMethod

    // 代码省略....

	// get a argv object from object pool
	argv := reflectTypePools.Get(mtype.ArgType)

	codec := share.Codecs[req.SerializeType()]
	if codec == nil {
    
		err = fmt.Errorf("can not find codec for %d", req.SerializeType())
		return s.handleError(res, err)
	}
    // 解析用户的请求参数
	err = codec.Decode(req.Payload, argv)
	if err != nil {
    
		return s.handleError(res, err)
	}

	argv, err = s.Plugins.DoPreCall(ctx, serviceName, methodName, argv)
	if err != nil {
    
		// return reply to object pool
		reflectTypePools.Put(mtype.ReplyType, replyv)
		return s.handleError(res, err)
	}
    // 调用service中注册的业务函数,处理业务方式
	if mtype.ArgType.Kind() != reflect.Ptr {
    
		err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
	} else {
    
		err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
	}

   // 代码省略....
	return res, nil
}

Client 端工作

  • 建立与Server的连接
  • 组装数据
  • 发送数据包
  • 接收处理结果数据包
  • 解析返回数据包
客户端和服务端的连接是怎样建立的

client 在调用 server的时候,只需要,NewXClient,然后调用Call 就可以请求对应服务的方法。

NewXClient
  • 通过服务发现的方式获取到服务器列表
  • 选择一种负载均衡的方式
  • 监听服务列表的变化
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
    
	client := &xClient{
    
		failMode:     failMode,
		selectMode:   selectMode,
		discovery:    discovery,
		servicePath:  servicePath,
		cachedClient: make(map[string]RPCClient),
		option:       option,
	}
    // 通过服务发现的方式获取到服务器列表
	pairs := discovery.GetServices()
	sort.Slice(pairs, func(i, j int) bool {
    
		return strings.Compare(pairs[i].Key, pairs[j].Key) <= 0
	})
	servers := make(map[string]string, len(pairs))
	for _, p := range pairs {
    
		servers[p.Key] = p.Value
	}
	filterByStateAndGroup(client.option.Group, servers)

    // 选择一种负载均衡的方式
	client.servers = servers
	if selectMode != Closest && selectMode != SelectByUser {
    
		client.selector = newSelector(selectMode, servers)
	}

	client.Plugins = &pluginContainer{
    }

   // 监听服务列表的变化
	ch := client.discovery.WatchService()
	if ch != nil {
    
		client.ch = ch
		go client.watch(ch)
	}

	return client
}
Call
call 方法做了两件事
  • selectClient:根据负载均衡的算法选出一个服务器
  • 与服务端建立连接。
func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, args interface{
    }) (RPCClient, error) {
    

    // 先通过缓存拿到RPCClient,如果能拿到,就不需要再新建连接了
	c.mu.Lock()
	client = c.findCachedClient(k, servicePath, serviceMethod)
	if client != nil {
    
		if !client.IsClosing() && !client.IsShutdown() {
    
			c.mu.Unlock()
			return client, nil
		}
		c.deleteCachedClient(client, k, servicePath, serviceMethod)
	}
    // 为什么这里会再查一次缓存,有可能在第一次findCachedClient的时候还没有缓存,
    // 但是之前后有连接建立了,新建了缓存,这里做一个Double Check
	client = c.findCachedClient(k, servicePath, serviceMethod)
	c.mu.Unlock()

	if client == nil || client.IsShutdown() {
    
		c.mu.Lock()
		generatedClient, err, _ := c.slGroup.Do(k, func() (interface{
    }, error) {
    
            // 去建立连接
			return c.generateClient(k, servicePath, serviceMethod)
		})
		c.mu.Unlock()

		c.slGroup.Forget(k)
		if err != nil {
    
			return nil, err
		}

		client = generatedClient.(RPCClient)
		if c.Plugins != nil {
    
			needCallPlugin = true
		}

		client.RegisterServerMessageChan(c.serverMessageChan)

		c.mu.Lock()
		c.setCachedClient(client, k, servicePath, serviceMethod)
		c.mu.Unlock()
	}

	return client, nil
}

generateClient 函数最底层又调用了 RPCClient.Connect,Connect 实现了各种网络类型的连接方式,http,ws,wss,或者自定义,也在这里会有一个client.input() 读取返回的数据, client.heartbeat 发送心跳请求

// Connect connects the server via specified network.
func (client *Client) Connect(network, address string) error {
    
	var conn net.Conn
	var err error
    // 实现了各种网络类型的连接方式,http,ws,wss,或者自定义
    // 默认是tcp
	switch network {
    
	case "http":
		conn, err = newDirectHTTPConn(client, network, address)
	case "ws", "wss":
		conn, err = newDirectWSConn(client, network, address)
	default:
		fn := ConnFactories[network]
		if fn != nil {
    
            // 自定义的连接方式
			conn, err = fn(client, network, address)
		} else {
    
            // tcp
			conn, err = newDirectConn(client, network, address)
		}
	}

	if err == nil && conn != nil {
    
		if tc, ok := conn.(*net.TCPConn); ok && client.option.TCPKeepAlivePeriod > 0 {
    
			_ = tc.SetKeepAlive(true)
			_ = tc.SetKeepAlivePeriod(client.option.TCPKeepAlivePeriod)
		}

		if client.option.IdleTimeout != 0 {
    
			_ = conn.SetDeadline(time.Now().Add(client.option.IdleTimeout))
		}

		if client.Plugins != nil {
    
			conn, err = client.Plugins.DoConnCreated(conn)
			if err != nil {
    
				return err
			}
		}

		client.Conn = conn
		client.r = bufio.NewReaderSize(conn, ReaderBuffsize)
		// c.w = bufio.NewWriterSize(conn, WriterBuffsize)

		// start reading and writing since connected
        // 读取返回的数据
		go client.input()

		if client.option.Heartbeat && client.option.HeartbeatInterval > 0 {
    
			go client.heartbeat()
		}

	}

	if err != nil && client.Plugins != nil {
    
		client.Plugins.DoConnCreateFailed(network, address)
	}

	return err
}
客户端是怎样给服务端发送数据的
  • wrapCall:建立连接之后,发送请求给服务端,最底层其实是调用了 client.send(ctx, call)

  • 首先会利用pending 一个map 结构 记录当前未结束的请求

  • 然后将当前的序列号+1

  • 对请求进行编码

  • 对服务方请求,把请求给发送到服务端

func (client *Client) send(ctx context.Context, call *Call) {
    

    // .......

    // 代码省略......
    // 记录当前未结束的请求
	if client.pending == nil {
    
		client.pending = make(map[uint64]*Call)
	}

	seq := client.seq
    // 序列号+1
	client.seq++
	client.pending[seq] = call
	client.mutex.Unlock()

	if cseq, ok := ctx.Value(seqKey{
    }).(*uint64); ok {
    
		*cseq = seq
	}

    // 对请求进行编码 .....
	data, err := codec.Encode(call.Args)
    // 代码省略......
    // 对服务方请求,这个时候就是把请求给发送过去了
	_, err = client.Conn.Write(*allData)

    // 代码省略......

}
客户端是怎样获取服务端的返回数据的

前面我们在阅读代码的时候可以看到,在 generateClient 的时候,会起一个协程执行 client.input(),这个client.input() 其实就是读取服务端返回数据的协程。

  • input 对不同的几类请求都有处理:

    • 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
      是一个request 返回的错误请求。则需要处理错误
    • 默认是一个reqeust 的 response
func (client *Client) input() {
    
	var err error

	for err == nil {
    
        // 代码省略......

        // 解码
		err = res.Decode(client.r)
		if err != nil {
    
			break
		}
		if client.Plugins != nil {
    
			_ = client.Plugins.DoClientAfterDecode(res)
		}

		 // 代码省略......

        seq := res.Seq()
		var call *Call
		isServerMessage := (res.MessageType() == protocol.Request && !res.IsHeartbeat() && res.IsOneway())
		if !isServerMessage {
    
			client.mutex.Lock()
			call = client.pending[seq]
			delete(client.pending, seq)
			client.mutex.Unlock()
		}

         // 针对不同的请求进行处理
		switch {
    
		case call == nil:
         // 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
			if isServerMessage {
    
				if client.ServerMessageChan != nil {
    
					client.handleServerRequest(res)
				}
				continue
			}
		case res.MessageStatusType() == protocol.Error:
            // 是一个request 返回的错误请求。则需要处理错误
			// We've got an error response. Give this to the request
			if len(res.Metadata) > 0 {
    
				call.ResMetadata = res.Metadata

				// convert server error to a customized error, which implements ServerError interface
				if ClientErrorFunc != nil {
    
					call.Error = ClientErrorFunc(res.Metadata[protocol.ServiceError])
				} else {
    
					call.Error = strErr(res.Metadata[protocol.ServiceError])
				}

			}

			if call.Raw {
    
				call.Metadata, call.Reply, _ = convertRes2Raw(res)
				call.Metadata[XErrorMessage] = call.Error.Error()
			} else if len(res.Payload) > 0 {
    
				data := res.Payload
				codec := share.Codecs[res.SerializeType()]
				if codec != nil {
    
					_ = codec.Decode(data, call.Reply)
				}
			}
			call.done()
		default:
            // 默认就是一个request 的 response 请求
			if call.Raw {
    
				call.Metadata, call.Reply, _ = convertRes2Raw(res)
			} else {
    
				data := res.Payload
				if len(data) > 0 {
    
					codec := share.Codecs[res.SerializeType()]
					if codec == nil {
    
						call.Error = strErr(ErrUnsupportedCodec.Error())
					} else {
    
						err = codec.Decode(data, call.Reply)
						if err != nil {
    
							call.Error = strErr(err.Error())
						}
					}
				}
				if len(res.Metadata) > 0 {
    
					call.ResMetadata = res.Metadata
				}

			}

			call.done()
		}
	}
    // 代码省略......
}

其他

消息里为什么要有SeqID

RPCX里面对服务端的调用其实是异步的,即对于当前线程来说,将请求发送出来后,协程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:

  • 怎么让当前协程“暂停”,等结果回来后,再向后执行?
  • 如果有多个协程同时进行远程方法调用,这时建立在client server之间的socket连接上会有很多双方发送的消息传递,前后顺序也可能是随机的,server处理完结果后,将结果消息发送给client,client收到很多消息,怎么知道哪个消息结果是原先哪个协程调用的?
怎么解决呢?
  • client线程每次通过socket调用一次远程接口前,生成一个唯一的ID,即SeqID(SeqID必需保证在一个Socket连接里面是唯一的),一般常常使用uint64从0开始累计数字生成唯一ID;
  • 存放到client的pending里面(requestID, call);
  • 当协程异步发送消息后,紧接着执行call.Done()的方法试图获取远程返回的结果。在Done()内部,会等待 chan *Call
  • 服务端接收到请求并处理后,将response结果(此结果中包含了前面的SeqID)发送给客户端,客户端socket连接上专门监听消息的线程收到消息,分析结果,取到SeqID,再从前面的pending里面get(SeqID),从而找到Call 对象,再将结果发送到chan *Call 中
  • 客户端从Done()取到Call 对象,处理返回的结果

RPCX 核心功能

连接管理

  • 保持与服务提供方长连接,用于传输请求数据也返回结果。

RPCX 的连接管理比较简单,其实就是用一个map将连接对存储起来。

client 端
type xClient struct {
    
    // key: 是根据:servicePath, serviceMethod,args 由不同的负载均衡算法算出来的,
    // RPCClient 就是真正的持有连接的Client
	cachedClient map[string]RPCClient
}

负载均衡

  • 确保多个服务提供方节点流量均匀/合理,支持节点扩容与灰度发布。RPCX 提供了多种负载均衡的方式。轮训,随机,带权重,一致性Hash,网络质量,地理位置。你也可以通过自己实现 Select 接口来实现自己的负载均衡方式,
  • 自己实现的负载均衡的方式的话,你就可以自己实现请求路由,达到应用隔离,读写分离,灰度发布中的作用
type Selector interface {
    
	Select(ctx context.Context, servicePath, serviceMethod string, args interface{
    }) string // SelectFunc
	UpdateServer(servers map[string]string)
}

// 负载均衡的实现
func newSelector(selectMode SelectMode, servers map[string]string) Selector {
    
	switch selectMode {
    
    // 随机     
	case RandomSelect:
		return newRandomSelector(servers)
    // 轮训
	case RoundRobin:
		return newRoundRobinSelector(servers)
    // 带权重    
	case WeightedRoundRobin:
		return newWeightedRoundRobinSelector(servers)
    // 网络质量
	case WeightedICMP:
		return newWeightedICMPSelector(servers)
    // 一致性Hash
	case ConsistentHash:
		return newConsistentHashSelector(servers)
    // 
	case SelectByUser:
		return nil
	default:
    // 随机
		return newRandomSelector(servers)
	}
}

超时处理

  • 对于长时间没有返回的请求,需要作出异常处理,及时释放资源。RPCX的超市处理其实是从两个方面来做的
    • 在底层网络连接上,通过SetDeadline 对读写请求设置读写超时时间
    • 在RPCX框架层面通过select + ctx 实现超时处理

服务保护

  • 服务提供方为保证正常运行,主动丢弃超出处理能力外的请求。常见的有限流、熔断、流量丢弃这几种方式
    • 限流:对他人调用自己服务的限制,常见的限流算法有:固定窗口, 滑动窗口,令牌桶算法,漏桶限流算法。
    • 熔断: 当调用他人服务出现问题的时候,主动放弃调用其他服务。Rpcx 提供了一个简单的断路器 ConsecCircuitBreaker, 它在连续失败一定次数后就会断开,再经过一段时间后打开。 你可以将你的断路器设置到 Option.Breaker中。
    • 流量丢弃:框架实现,当RPC Server 端挤压的请求较多时,RPC框架直接将这个请求丢弃。在Go中,可以用一个带缓冲的channel 实现,当缓冲区满了时,直接将流量丢弃。

失败重试

  • rpcx支持四种调用失败模式,用来处理服务调用失败后的处理逻辑, 你可以在创建XClient的时候设置它。

    • Failfast: 一旦调用一个节点失败, rpcx立即会返回错误。 注意这个错误不是业务上的 Error, 业务上服务端返回的Error应该正常返回给客户端,这里的错误可能是网络错误或者服务异常。默认策略就是.
      Failfast
    • Failtry:rpcx如果调用一个节点的服务出现错误, 它也会尝试,但是还是选择这个节点进行重试, 直到节点正常返回数据或者达到最大重试次数。
    • Failover: rpcx如果遇到错误,它会尝试调用另外一个节点, 直到服务节点能正常返回信息,或者达到最大的重试次数。 重试测试Retries在参数Option中设置, 默认设置为3。
    • Failbackup:如果服务节点在一定的时间内不返回结果, rpcx客户端会发送相同的请求到另外一个节点, 只要这两个节点有一个返回, rpcx就算调用成功。这种通过资源换取延迟的方式可以参看 Jeff Dean的文章 Achieving Rapid Response Times in Large Online Services,这种实现非常重要,我们重点来看这是怎么实现的
Failbackup 重试模式
  • 生成两个Call,用来接受可能的两次rpc请求的结果
  • 第一个请求开始
  • 设置第一个请求多久没有返回才开启第二个请求的时间
  • select 监听 ctx是否退出,第一个请求, 是否达到设置的请求时间第一个请求还没有返回
    • 如果ctx Done。则直接返回
    • 如果第一个请求的结果返回,则处理结果
    • 如果达到设置的请求时间第一个请求还没有返回,则开始第二个请求
  • 开始发送第二个请求
  • select 就需要 ctx是否退出,第一个请求,第二个请求。看哪一个数据先准备好
    • 如果ctx Done。则直接返回
    • 如果第一个请求的结果返回,则处理结果
    • 如果第二个请求的结果返回,则处理结果
// Failbackup 重试模式
case Failbackup:
		ctx, cancelFn := context.WithCancel(ctx)
		defer cancelFn()
        // 生成两个Call,用来接受可能的两次rpc请求的结果
		call1 := make(chan *Call, 10)
		call2 := make(chan *Call, 10)
		var reply1, reply2 interface{
    }

		if reply != nil {
    
			reply1 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
			reply2 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
		}
        // 第一个请求开始
		_, err1 := c.Go(ctx, serviceMethod, args, reply1, call1)

        // 第一个请求多久没有返回才开启第二个请求
		t := time.NewTimer(c.option.BackupLatency)
        // select 监听 ctx,call,t 看谁先返回数据
		select {
    
        // 如果ctx Done。则直接返回
		case <-ctx.Done(): // cancel by context
			err = ctx.Err()
			return err
        // 第一个请求的结果
		case call := <-call1:
			err = call.Error
            // 有请求数据返回
			if err == nil && reply != nil {
    
                // 设置给reply
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
			}
            // 结束
			return err
         // 如果t的时间先到,则这里什么也不做,而直接走到下面的代码   
		case <-t.C:

		}
        // 这里开始发送第二个请求
		_, err2 := c.Go(ctx, serviceMethod, args, reply2, call2)
		if err2 != nil {
    
			if uncoverError(err2) {
    
				c.removeClient(k, c.servicePath, serviceMethod, client)
			}
			err = err1
			return err
		}
        // 这里select 就需要监听 ctx,第一个请求call1,第二个请求call2。看哪一个数据先准备好
		select {
    
		case <-ctx.Done(): // cancel by context
			err = ctx.Err()
		case call := <-call1:
			err = call.Error
			if err == nil && reply != nil && reply1 != nil {
    
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
			}
		case call := <-call2:
			err = call.Error
			if err == nil && reply != nil && reply2 != nil {
    
				reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply2).Elem())
			}
		}
		return err

总结

  • 通过分析了RPCX框架的实现原理,我们大概知道了一次RPC请求到底发生了什么事情。我们也可以知道

  • RPC 主要用于公司内部的服务调用,性能消耗低,传输效率高,实现复杂。

  • HTTP 主要用于对外的异构环境,浏览器接口调用,App 接口调用,第三方接口调用等。

  • RPC 使用场景(大型的网站,内部子系统较多、接口非常多的情况下适合使用 RPC):

    • 长链接。不必每次通信都要像 HTTP 一样去 3 次握手,减少了网络开销。
    • 注册发布机制。RPC 框架一般都有注册中心,有丰富的监控管理;发布、下线接口、动态扩展等,对调用方来说是无感知、统一化的操作。
    • 安全性,没有暴露资源操作。
    • 微服务支持。就是最近流行的服务化架构、服务化治理,RPC 框架是一个强力的支撑。
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/baidu_32452525/article/details/126975916

智能推荐

oracle 12c 集群安装后的检查_12c查看crs状态-程序员宅基地

文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态

解决jupyter notebook无法找到虚拟环境的问题_jupyter没有pytorch环境-程序员宅基地

文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境

国内安装scoop的保姆教程_scoop-cn-程序员宅基地

文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn

Element ui colorpicker在Vue中的使用_vue el-color-picker-程序员宅基地

文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker

迅为iTOP-4412精英版之烧写内核移植后的镜像_exynos 4412 刷机-程序员宅基地

文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机

Linux系统配置jdk_linux配置jdk-程序员宅基地

文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk

随便推点

matlab(4):特殊符号的输入_matlab微米怎么输入-程序员宅基地

文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入

C语言程序设计-文件(打开与关闭、顺序、二进制读写)-程序员宅基地

文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。‍ Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。

Touchdesigner自学笔记之三_touchdesigner怎么让一个模型跟着鼠标移动-程序员宅基地

文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动

【附源码】基于java的校园停车场管理系统的设计与实现61m0e9计算机毕设SSM_基于java技术的停车场管理系统实现与设计-程序员宅基地

文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计

Android系统播放器MediaPlayer源码分析_android多媒体播放源码分析 时序图-程序员宅基地

文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;amp;gt;Jni-&amp;amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图

java 数据结构与算法 ——快速排序法-程序员宅基地

文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法