技术标签: Golang rpc 网络 网络协议 TCP/IP协议
RPCX
是一个分布式的Go语言的 RPC 框架,支持Zookepper、etcd、consul多种服务发现方式,多种服务路由方式,package main
import (
"flag"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/server"
)
var addr = flag.String("addr", "localhost:8972", "server address")
func main() {
flag.Parse()
s := server.NewServer()
// s.RegisterName("Arith", new(example.Arith), "")
s.Register(new(example.Arith), "")
s.Serve("tcp", *addr)
}
package main
import (
"context"
"flag"
"log"
"time"
example "github.com/rpcxio/rpcx-examples"
"github.com/smallnest/rpcx/client"
)
var (
addr = flag.String("addr", "localhost:8972", "server address")
)
func main() {
flag.Parse()
d, _ := client.NewPeer2PeerDiscovery("tcp@"+*addr, "")
xclient := client.NewXClient("Arith", client.Failtry, client.RandomSelect, d, client.DefaultOption)
defer xclient.Close()
args := &example.Args{
A: 10,
B: 20,
}
for {
reply := &example.Reply{
}
err := xclient.Call(context.Background(), "Mul", args, reply)
if err != nil {
log.Fatalf("failed to call: %v", err)
}
log.Printf("%d * %d = %d", args.A, args.B, reply.C)
time.Sleep(1e9)
}
}
这是一个简单的例子,使用RPCX实现了服务端和客户端。
在上面的例子中,我们只需要用service.NewServer()→ RegisterName → Serve
就可以开始一个rpc服务端。我们也可以使用rpcx的插件功能,将服务端的地址注册到持久化存储当中,这样client 就可以使用服务发现的功能来对服务端进行rpc调用。
使用RegistryPlugin
的方式进行服务注册时,rpcx 会在服务启动前将服务注册到注册中心中。
// RegisterPlugin is .
// 注册中心接口定义,如果你要实现自己的注册中心,就需要实现这个RegisterPlugin接口
RegisterPlugin interface {
Register(name string, rcvr interface{
}, metadata string) error
Unregister(name string) error
}
// RegisterName is like Register but uses the provided name for the type
// instead of the receiver's concrete type.
func (s *Server) RegisterName(name string, rcvr interface{
}, metadata string) error {
_, err := s.register(rcvr, name, true)
if err != nil {
return err
}
if s.Plugins == nil {
s.Plugins = &pluginContainer{
}
}
// RegistryPlugin 执行,进行服务注册
return s.Plugins.DoRegister(name, rcvr, metadata)
}
我们知道,一个RPC的调用会涉及到两端,一个是Client
,一个是Server
端。两端的基本工作如下
serveListener
函数,Accept
客户端过来的连接,serveConn
来处理这个请求// serveListener accepts incoming connections on the Listener ln,
// creating a new service goroutine for each.
// The service goroutines read requests and then call services to reply to them.
func (s *Server) serveListener(ln net.Listener) error {
var tempDelay time.Duration
s.mu.Lock()
s.ln = ln
s.mu.Unlock()
for {
// 接受客户端过来的连接
conn, e := ln.Accept()
// 省略代码........
if tc, ok := conn.(*net.TCPConn); ok {
period := s.options["TCPKeepAlivePeriod"]
if period != nil {
tc.SetKeepAlive(true)
tc.SetKeepAlivePeriod(period.(time.Duration))
tc.SetLinger(10)
}
}
// 处理请求之前的操作
conn, ok := s.Plugins.DoPostConnAccept(conn)
if !ok {
conn.Close()
continue
}
// 保存到这个请求
s.mu.Lock()
s.activeConn[conn] = struct{
}{
}
s.mu.Unlock()
if share.Trace {
log.Debugf("server accepted an conn: %v", conn.RemoteAddr().String())
}
// 处理连接请求
go s.serveConn(conn)
}
}
serveConn
是读取和处理用户过来的请求,如果开启了异步写,服务端会异步response
给client,
其中 readRequest
从网络连接中读取到客户端过来的数据,这里会包含解码解压缩操作
processOneRequest
就是真正处理客户端请求的函数了,这里会通过反射调用服务的方法
func (s *Server) serveConn(conn net.Conn) {
// 省略代码........
r := bufio.NewReaderSize(conn, ReaderBuffsize)
// 开启异步response给client
var writeCh chan *[]byte
if s.AsyncWrite {
writeCh = make(chan *[]byte, 1)
defer close(writeCh)
go s.serveAsyncWrite(conn, writeCh)
}
// read requests and handle it
// 读取和处理请求
for {
// 省略代码......
// 读取请求
req, err := s.readRequest(ctx, r)
// 处理请求
if s.pool != nil {
s.pool.Submit(func() {
s.processOneRequest(ctx, req, conn, writeCh)
})
} else {
go s.processOneRequest(ctx, req, conn, writeCh)
}
}
}
func (s *Server) readRequest(ctx context.Context, r io.Reader) (req *protocol.Message, err error) {
// 处理request之前的操作
err = s.Plugins.DoPreReadRequest(ctx)
if err != nil {
return nil, err
}
// pool req?
req = protocol.GetPooledMsg()
// 解码解压缩
err = req.Decode(r)
if err == io.EOF {
return req, err
}
// 处理request之前的操作
perr := s.Plugins.DoPostReadRequest(ctx, req, err)
if err == nil {
err = perr
}
return req, err
}
如果配置了压缩方式,也会对服务的数据进行解压缩
func (m *Message) Decode(r io.Reader) error {
// 前面都是协议的解析
// 如果配置了压缩方式,对服务的数据进行解压缩到 Payload 中
if m.CompressType() != None {
compressor := Compressors[m.CompressType()]
if compressor == nil {
return ErrUnsupportedCompressor
}
m.Payload, err = compressor.Unzip(m.Payload)
if err != nil {
return err
}
}
return err
}
接受到用户的请求之后,就是调用服务端对应的方法了,在用户消息Decode
中,服务端已经把ServicePath、ServiceMethod
解析出来了,剩下的就是只要找到对应的方法就可以执行
m.ServicePath = util.SliceByteToString(data[n:nEnd])
m.ServiceMethod = util.SliceByteToString(data[n:nEnd])
前面在 servConn
中的函数 processOneRequest
会处理Client的请求,我们来看下,这里是怎么处理的,
Payload
中的 请求参数,service 这个函数
,service := s.serviceMap[serviceName]
就是 服务注册过去的业务函数。sendResponse
发送response
给client
func (s *Server) handleRequest(ctx context.Context, req *protocol.Message) (res *protocol.Message, err error) {
serviceName := req.ServicePath
methodName := req.ServiceMethod
// 代码省略....
// get a argv object from object pool
argv := reflectTypePools.Get(mtype.ArgType)
codec := share.Codecs[req.SerializeType()]
if codec == nil {
err = fmt.Errorf("can not find codec for %d", req.SerializeType())
return s.handleError(res, err)
}
// 解析用户的请求参数
err = codec.Decode(req.Payload, argv)
if err != nil {
return s.handleError(res, err)
}
argv, err = s.Plugins.DoPreCall(ctx, serviceName, methodName, argv)
if err != nil {
// return reply to object pool
reflectTypePools.Put(mtype.ReplyType, replyv)
return s.handleError(res, err)
}
// 调用service中注册的业务函数,处理业务方式
if mtype.ArgType.Kind() != reflect.Ptr {
err = service.call(ctx, mtype, reflect.ValueOf(argv).Elem(), reflect.ValueOf(replyv))
} else {
err = service.call(ctx, mtype, reflect.ValueOf(argv), reflect.ValueOf(replyv))
}
// 代码省略....
return res, nil
}
client 在调用 server的时候,只需要,NewXClient
,然后调用Call 就可以请求对应服务的方法。
func NewXClient(servicePath string, failMode FailMode, selectMode SelectMode, discovery ServiceDiscovery, option Option) XClient {
client := &xClient{
failMode: failMode,
selectMode: selectMode,
discovery: discovery,
servicePath: servicePath,
cachedClient: make(map[string]RPCClient),
option: option,
}
// 通过服务发现的方式获取到服务器列表
pairs := discovery.GetServices()
sort.Slice(pairs, func(i, j int) bool {
return strings.Compare(pairs[i].Key, pairs[j].Key) <= 0
})
servers := make(map[string]string, len(pairs))
for _, p := range pairs {
servers[p.Key] = p.Value
}
filterByStateAndGroup(client.option.Group, servers)
// 选择一种负载均衡的方式
client.servers = servers
if selectMode != Closest && selectMode != SelectByUser {
client.selector = newSelector(selectMode, servers)
}
client.Plugins = &pluginContainer{
}
// 监听服务列表的变化
ch := client.discovery.WatchService()
if ch != nil {
client.ch = ch
go client.watch(ch)
}
return client
}
func (c *xClient) getCachedClient(k string, servicePath, serviceMethod string, args interface{
}) (RPCClient, error) {
// 先通过缓存拿到RPCClient,如果能拿到,就不需要再新建连接了
c.mu.Lock()
client = c.findCachedClient(k, servicePath, serviceMethod)
if client != nil {
if !client.IsClosing() && !client.IsShutdown() {
c.mu.Unlock()
return client, nil
}
c.deleteCachedClient(client, k, servicePath, serviceMethod)
}
// 为什么这里会再查一次缓存,有可能在第一次findCachedClient的时候还没有缓存,
// 但是之前后有连接建立了,新建了缓存,这里做一个Double Check
client = c.findCachedClient(k, servicePath, serviceMethod)
c.mu.Unlock()
if client == nil || client.IsShutdown() {
c.mu.Lock()
generatedClient, err, _ := c.slGroup.Do(k, func() (interface{
}, error) {
// 去建立连接
return c.generateClient(k, servicePath, serviceMethod)
})
c.mu.Unlock()
c.slGroup.Forget(k)
if err != nil {
return nil, err
}
client = generatedClient.(RPCClient)
if c.Plugins != nil {
needCallPlugin = true
}
client.RegisterServerMessageChan(c.serverMessageChan)
c.mu.Lock()
c.setCachedClient(client, k, servicePath, serviceMethod)
c.mu.Unlock()
}
return client, nil
}
generateClient
函数最底层又调用了 RPCClient.Connect,Connect
实现了各种网络类型的连接方式,http,ws,wss
,或者自定义,也在这里会有一个client.input()
读取返回的数据, client.heartbeat
发送心跳请求
// Connect connects the server via specified network.
func (client *Client) Connect(network, address string) error {
var conn net.Conn
var err error
// 实现了各种网络类型的连接方式,http,ws,wss,或者自定义
// 默认是tcp
switch network {
case "http":
conn, err = newDirectHTTPConn(client, network, address)
case "ws", "wss":
conn, err = newDirectWSConn(client, network, address)
default:
fn := ConnFactories[network]
if fn != nil {
// 自定义的连接方式
conn, err = fn(client, network, address)
} else {
// tcp
conn, err = newDirectConn(client, network, address)
}
}
if err == nil && conn != nil {
if tc, ok := conn.(*net.TCPConn); ok && client.option.TCPKeepAlivePeriod > 0 {
_ = tc.SetKeepAlive(true)
_ = tc.SetKeepAlivePeriod(client.option.TCPKeepAlivePeriod)
}
if client.option.IdleTimeout != 0 {
_ = conn.SetDeadline(time.Now().Add(client.option.IdleTimeout))
}
if client.Plugins != nil {
conn, err = client.Plugins.DoConnCreated(conn)
if err != nil {
return err
}
}
client.Conn = conn
client.r = bufio.NewReaderSize(conn, ReaderBuffsize)
// c.w = bufio.NewWriterSize(conn, WriterBuffsize)
// start reading and writing since connected
// 读取返回的数据
go client.input()
if client.option.Heartbeat && client.option.HeartbeatInterval > 0 {
go client.heartbeat()
}
}
if err != nil && client.Plugins != nil {
client.Plugins.DoConnCreateFailed(network, address)
}
return err
}
wrapCall:建立连接之后,发送请求给服务端,最底层其实是调用了 client.send(ctx, call)
首先会利用pending 一个map 结构 记录当前未结束的请求
然后将当前的序列号+1
对请求进行编码
对服务方请求,把请求给发送到服务端
func (client *Client) send(ctx context.Context, call *Call) {
// .......
// 代码省略......
// 记录当前未结束的请求
if client.pending == nil {
client.pending = make(map[uint64]*Call)
}
seq := client.seq
// 序列号+1
client.seq++
client.pending[seq] = call
client.mutex.Unlock()
if cseq, ok := ctx.Value(seqKey{
}).(*uint64); ok {
*cseq = seq
}
// 对请求进行编码 .....
data, err := codec.Encode(call.Args)
// 代码省略......
// 对服务方请求,这个时候就是把请求给发送过去了
_, err = client.Conn.Write(*allData)
// 代码省略......
}
前面我们在阅读代码的时候可以看到,在 generateClient
的时候,会起一个协程执行 client.input()
,这个client.input()
其实就是读取服务端返回数据的协程。
input 对不同的几类请求都有处理:
func (client *Client) input() {
var err error
for err == nil {
// 代码省略......
// 解码
err = res.Decode(client.r)
if err != nil {
break
}
if client.Plugins != nil {
_ = client.Plugins.DoClientAfterDecode(res)
}
// 代码省略......
seq := res.Seq()
var call *Call
isServerMessage := (res.MessageType() == protocol.Request && !res.IsHeartbeat() && res.IsOneway())
if !isServerMessage {
client.mutex.Lock()
call = client.pending[seq]
delete(client.pending, seq)
client.mutex.Unlock()
}
// 针对不同的请求进行处理
switch {
case call == nil:
// 如果call == nil 则说明不是一个reqeust 的 response,是一个从服务端发过来的数据
if isServerMessage {
if client.ServerMessageChan != nil {
client.handleServerRequest(res)
}
continue
}
case res.MessageStatusType() == protocol.Error:
// 是一个request 返回的错误请求。则需要处理错误
// We've got an error response. Give this to the request
if len(res.Metadata) > 0 {
call.ResMetadata = res.Metadata
// convert server error to a customized error, which implements ServerError interface
if ClientErrorFunc != nil {
call.Error = ClientErrorFunc(res.Metadata[protocol.ServiceError])
} else {
call.Error = strErr(res.Metadata[protocol.ServiceError])
}
}
if call.Raw {
call.Metadata, call.Reply, _ = convertRes2Raw(res)
call.Metadata[XErrorMessage] = call.Error.Error()
} else if len(res.Payload) > 0 {
data := res.Payload
codec := share.Codecs[res.SerializeType()]
if codec != nil {
_ = codec.Decode(data, call.Reply)
}
}
call.done()
default:
// 默认就是一个request 的 response 请求
if call.Raw {
call.Metadata, call.Reply, _ = convertRes2Raw(res)
} else {
data := res.Payload
if len(data) > 0 {
codec := share.Codecs[res.SerializeType()]
if codec == nil {
call.Error = strErr(ErrUnsupportedCodec.Error())
} else {
err = codec.Decode(data, call.Reply)
if err != nil {
call.Error = strErr(err.Error())
}
}
}
if len(res.Metadata) > 0 {
call.ResMetadata = res.Metadata
}
}
call.done()
}
}
// 代码省略......
}
RPCX里面对服务端的调用其实是异步的,即对于当前线程来说,将请求发送出来后,协程就可以往后执行了,至于服务端的结果,是服务端处理完成后,再以消息的形式发送给客户端的。于是这里出现以下两个问题:
chan *Call
RPCX 的连接管理比较简单,其实就是用一个map将连接对存储起来。
type xClient struct {
// key: 是根据:servicePath, serviceMethod,args 由不同的负载均衡算法算出来的,
// RPCClient 就是真正的持有连接的Client
cachedClient map[string]RPCClient
}
type Selector interface {
Select(ctx context.Context, servicePath, serviceMethod string, args interface{
}) string // SelectFunc
UpdateServer(servers map[string]string)
}
// 负载均衡的实现
func newSelector(selectMode SelectMode, servers map[string]string) Selector {
switch selectMode {
// 随机
case RandomSelect:
return newRandomSelector(servers)
// 轮训
case RoundRobin:
return newRoundRobinSelector(servers)
// 带权重
case WeightedRoundRobin:
return newWeightedRoundRobinSelector(servers)
// 网络质量
case WeightedICMP:
return newWeightedICMPSelector(servers)
// 一致性Hash
case ConsistentHash:
return newConsistentHashSelector(servers)
//
case SelectByUser:
return nil
default:
// 随机
return newRandomSelector(servers)
}
}
rpcx支持四种调用失败模式,用来处理服务调用失败后的处理逻辑, 你可以在创建XClient的时候设置它。
// Failbackup 重试模式
case Failbackup:
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
// 生成两个Call,用来接受可能的两次rpc请求的结果
call1 := make(chan *Call, 10)
call2 := make(chan *Call, 10)
var reply1, reply2 interface{
}
if reply != nil {
reply1 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
reply2 = reflect.New(reflect.ValueOf(reply).Elem().Type()).Interface()
}
// 第一个请求开始
_, err1 := c.Go(ctx, serviceMethod, args, reply1, call1)
// 第一个请求多久没有返回才开启第二个请求
t := time.NewTimer(c.option.BackupLatency)
// select 监听 ctx,call,t 看谁先返回数据
select {
// 如果ctx Done。则直接返回
case <-ctx.Done(): // cancel by context
err = ctx.Err()
return err
// 第一个请求的结果
case call := <-call1:
err = call.Error
// 有请求数据返回
if err == nil && reply != nil {
// 设置给reply
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
}
// 结束
return err
// 如果t的时间先到,则这里什么也不做,而直接走到下面的代码
case <-t.C:
}
// 这里开始发送第二个请求
_, err2 := c.Go(ctx, serviceMethod, args, reply2, call2)
if err2 != nil {
if uncoverError(err2) {
c.removeClient(k, c.servicePath, serviceMethod, client)
}
err = err1
return err
}
// 这里select 就需要监听 ctx,第一个请求call1,第二个请求call2。看哪一个数据先准备好
select {
case <-ctx.Done(): // cancel by context
err = ctx.Err()
case call := <-call1:
err = call.Error
if err == nil && reply != nil && reply1 != nil {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply1).Elem())
}
case call := <-call2:
err = call.Error
if err == nil && reply != nil && reply2 != nil {
reflect.ValueOf(reply).Elem().Set(reflect.ValueOf(reply2).Elem())
}
}
return err
通过分析了RPCX框架的实现原理,我们大概知道了一次RPC请求到底发生了什么事情。我们也可以知道
RPC 主要用于公司内部的服务调用,性能消耗低,传输效率高,实现复杂。
HTTP 主要用于对外的异构环境,浏览器接口调用,App 接口调用,第三方接口调用等。
RPC 使用场景(大型的网站,内部子系统较多、接口非常多的情况下适合使用 RPC):
文章浏览阅读1.6k次。安装配置gi、安装数据库软件、dbca建库见下:http://blog.csdn.net/kadwf123/article/details/784299611、检查集群节点及状态:[root@rac2 ~]# olsnodes -srac1 Activerac2 Activerac3 Activerac4 Active[root@rac2 ~]_12c查看crs状态
文章浏览阅读1.3w次,点赞45次,收藏99次。我个人用的是anaconda3的一个python集成环境,自带jupyter notebook,但在我打开jupyter notebook界面后,却找不到对应的虚拟环境,原来是jupyter notebook只是通用于下载anaconda时自带的环境,其他环境要想使用必须手动下载一些库:1.首先进入到自己创建的虚拟环境(pytorch是虚拟环境的名字)activate pytorch2.在该环境下下载这个库conda install ipykernelconda install nb__jupyter没有pytorch环境
文章浏览阅读5.2k次,点赞19次,收藏28次。选择scoop纯属意外,也是无奈,因为电脑用户被锁了管理员权限,所有exe安装程序都无法安装,只可以用绿色软件,最后被我发现scoop,省去了到处下载XXX绿色版的烦恼,当然scoop里需要管理员权限的软件也跟我无缘了(譬如everything)。推荐添加dorado这个bucket镜像,里面很多中文软件,但是部分国外的软件下载地址在github,可能无法下载。以上两个是官方bucket的国内镜像,所有软件建议优先从这里下载。上面可以看到很多bucket以及软件数。如果官网登陆不了可以试一下以下方式。_scoop-cn
文章浏览阅读4.5k次,点赞2次,收藏3次。首先要有一个color-picker组件 <el-color-picker v-model="headcolor"></el-color-picker>在data里面data() { return {headcolor: ’ #278add ’ //这里可以选择一个默认的颜色} }然后在你想要改变颜色的地方用v-bind绑定就好了,例如:这里的:sty..._vue el-color-picker
文章浏览阅读640次。基于芯片日益增长的问题,所以内核开发者们引入了新的方法,就是在内核中只保留函数,而数据则不包含,由用户(应用程序员)自己把数据按照规定的格式编写,并放在约定的地方,为了不占用过多的内存,还要求数据以根精简的方式编写。boot启动时,传参给内核,告诉内核设备树文件和kernel的位置,内核启动时根据地址去找到设备树文件,再利用专用的编译器去反编译dtb文件,将dtb还原成数据结构,以供驱动的函数去调用。firmware是三星的一个固件的设备信息,因为找不到固件,所以内核启动不成功。_exynos 4412 刷机
文章浏览阅读2w次,点赞24次,收藏42次。Linux系统配置jdkLinux学习教程,Linux入门教程(超详细)_linux配置jdk
文章浏览阅读3.3k次,点赞5次,收藏19次。xlabel('\delta');ylabel('AUC');具体符号的对照表参照下图:_matlab微米怎么输入
文章浏览阅读119次。顺序读写指的是按照文件中数据的顺序进行读取或写入。对于文本文件,可以使用fgets、fputs、fscanf、fprintf等函数进行顺序读写。在C语言中,对文件的操作通常涉及文件的打开、读写以及关闭。文件的打开使用fopen函数,而关闭则使用fclose函数。在C语言中,可以使用fread和fwrite函数进行二进制读写。 Biaoge 于2024-03-09 23:51发布 阅读量:7 ️文章类型:【 C语言程序设计 】在C语言中,用于打开文件的函数是____,用于关闭文件的函数是____。
文章浏览阅读3.4k次,点赞2次,收藏13次。跟随鼠标移动的粒子以grid(SOP)为partical(SOP)的资源模板,调整后连接【Geo组合+point spirit(MAT)】,在连接【feedback组合】适当调整。影响粒子动态的节点【metaball(SOP)+force(SOP)】添加mouse in(CHOP)鼠标位置到metaball的坐标,实现鼠标影响。..._touchdesigner怎么让一个模型跟着鼠标移动
文章浏览阅读178次。项目运行环境配置:Jdk1.8 + Tomcat7.0 + Mysql + HBuilderX(Webstorm也行)+ Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。项目技术:Springboot + mybatis + Maven +mysql5.7或8.0+html+css+js等等组成,B/S模式 + Maven管理等等。环境需要1.运行环境:最好是java jdk 1.8,我们在这个平台上运行的。其他版本理论上也可以。_基于java技术的停车场管理系统实现与设计
文章浏览阅读3.5k次。前言对于MediaPlayer播放器的源码分析内容相对来说比较多,会从Java-&amp;gt;Jni-&amp;gt;C/C++慢慢分析,后面会慢慢更新。另外,博客只作为自己学习记录的一种方式,对于其他的不过多的评论。MediaPlayerDemopublic class MainActivity extends AppCompatActivity implements SurfaceHolder.Cal..._android多媒体播放源码分析 时序图
文章浏览阅读2.4k次,点赞41次,收藏13次。java 数据结构与算法 ——快速排序法_快速排序法