微服务网关 (六)网络代理详细和ReverseProxy主要源码分析

网络代理

什么是网络代理

  1. 用户通过代理请求信息
  2. 请求通过网络代理完成转发到目标服务器
  3. 目标服务器响应后再通过网络代理回传给用户

网络代理和网络转发的区别

两种方式的请求过程:

  • 网络转发:

    ​ 由图可知由客户端发出的请求中,des,即目标ip都是实际服务器的ip,而在转发途中,src的ip在经过中间路由的转发后,会将src更改为转发路由器的ip。

image-20221003163146984

  • 网络代理:

    ​ 由图可知客户端发送的请求中,des目标ip并不是实际服务器的ip,而是代理的ip。而正向、反向代理的这个服务器会去接收这个数据。首先,它会创建两个socket,一个接收来自客户端的数据,一个去连接实际服务器,它会把接收到的数据经过中间的数据处理后,发送给实际的服务器。实际的服务器在响应了这个数据后,再回传给代理服务器,代理服务器同样创建两个socket,在接收到数据后,再经过一定处理,再将数据回传给客户端。

    ​ 在这个过程中,客户端始终不知道有实际服务器的存在

    image-20221008153501330

区别:

网络代理:

​ 用户不直接连接实际的服务器,而是交给网络代理去连接。在获取了数据后,网络代理会将数据返回给用户。

网络转发:

​ 是路由器对报文的转发操作,中间有可能对数据包进行修改

正向代理(略写)forward_proxy

​ 是一种客户端的代理技术,帮助客户端访问无法访问的服务资源,可以隐藏用户的真实ip。如浏览器web代理、vpn等

大概步骤:

  1. 监听中的代理服务器在接收到客户端的请求后,会创建一个上游的tcp连接,通过回调方法,复制原请求对象,并根据其中的数据配置新的请求中的各种参数
  2. 把新请求发送到真实的服务器,并接收到服务器端的返回
  3. 代理服务器对响应做一些处理后,返回给客户端

反向代理(详写)Reverse_proxy

​ 是一种服务端的代理技术,帮助服务器做负载均衡、缓存、提供安全校验等,可以隐藏服务器的真实ip。如LVS技术、Nginx Proxy_pass等

​ 「用户」通过互联网去请求「反向代理服务器」,「反向代理服务器」去请求「真实的服务器」,再此过程中,「用户」并不知道自己请求的真实服务器地址,以及反向代理后有多少台真实服务器。用户只需要去和反向代理做交互即可。这样一来,代理服务器就可以做很多事情了,如帮助服务器做请求转发、负载、安全校验等。

大概示意图:

image-20221008160823127

大概步骤:

  1. 代理接收客户端请求,更改请求结构体信息
  2. 通过一定的负载均衡算法获取下游服务器地址
  3. 把请求发送到下游服务器,并获取返回内容
  4. 对返回内容做一些处理,返回给客户端

HTTP代理:

支持实现的功能:

  • 错误回调及错误日志处理

    ​ 遇到错误时,能有一个统一的方法去使用,以及错误的日志要做怎么样的处理

  • 更改代理返回内容

  • 负载均衡

  • URL重写

    ​ 将一个地址的前缀,转化成另一个地址的前缀

  • 限流、熔断、降级

  • 数据统计

  • 权限验证

实现途径:使用golang官方的ReverseProxy实现HTTP代理

ReverseProxy

功能点:
  • 支持更改ReverseProxy的内容
  • 支持设置错误信息回调
  • 支持自定义的负载均衡策略
  • URL重写功能
  • 支持连接池功能(即不需要每次都开辟一个新的连接池,而是可以复用连接池)
  • 支持websocket服务
  • 支持https代理
ReverseProxy结构体概览
type ReverseProxy struct {
   //控制器必须是一个函数,函数内部可以对请求进行修改,比如请求的路径,请求的参数
   Director func(*http.Request)

   //连接池,如果为nil,则使用http.DefaultTransport
   Transport http.RoundTripper

   //刷新到客户端的刷新间隔
   FlushInterval time.Duration

   //错误记录器
   ErrorLog *log.Logger

   //定义一个缓冲池,在复制http响应的时候使用,用以提高请求效率
   BufferPool BufferPool

   //修改response返回内容的函数
   //将函数格式定义为以下格式,就能对返回内容进行修改
   ModifyResponse func(*http.Response) error

   //错误回调函数,如果为nil,则默认为记录提供的错误并返回502状态错误网关响应。
   ErrorHandler func(http.ResponseWriter, *http.Request, error)
}
ReverseProxy方法
  • 核心方法:func (p *ReverseProxy) ServeHTTP
源码分析:

首先看向http.ListenAndServe方法

image-20221011110905463

它接收了一个参数handler,这个Handler的定义是

image-20221011111110366

这个Handler它需要实现一个方法,也就是ServeHTTP(ResponseWriter,*Request)方法

如果要实现对应的Handler,也就是想要往ListenAndServe中传入这个Handler,那我们就必须要实现这个接口。

所以,这就是为什么ReverseProxy要实现这个接口,ReverseProxy能够传入到ListenAndServe的原因

image-20221011111422422

image-20221011111556191

因此,我们就可以知道ReverseProxy代理回调的方法也是ServeHTTP

接下来,开始啃ServeHTTP的源码:

func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
	//验证结构体里面有没有设置过ReverseProxy的连接池,没有则使用默认连接池
    transport := p.Transport
	if transport == nil {
		transport = http.DefaultTransport
	}

    //1、验证是否请求终止
     //上下文取得信息,向下转型为CloseNotifier
     //(http.CloseNotifier是一个接口,只有一个方法CloseNotify() <-chan bool,作用是检测连接是否断开)
     //取出里面通知的一个channel,即cn.CloseNotify(),紧接着开启一个协程,一直监听这个channel是否有请求终止的消息,如果有,便执行cancel()方法
	ctx := req.Context()
	if cn, ok := rw.(http.CloseNotifier); ok {
		var cancel context.CancelFunc
		ctx, cancel = context.WithCancel(ctx)
		defer cancel()
		notifyChan := cn.CloseNotify()
		go func() {
			select {
			case <-notifyChan:
				cancel()
			case <-ctx.Done():
			}
		}()
	}

    //2、设置context信息
    //通过上游发送过来的req,重新拷贝新建一个outreq对外请求的request,可以理解为往下文请求的一个request
	outreq := req.Clone(ctx)
     //对outreq的信息做特殊处理
	if req.ContentLength == 0 {
		outreq.Body = nil // Issue 16036: nil Body for http.Transport retries
	}
	if outreq.Body != nil {
		defer outreq.Body.Close()
	}
    
    //3、深拷贝Header
	if outreq.Header == nil {
		outreq.Header = make(http.Header) // Issue 33142: historical behavior was to always allocate
	}
	
    //4、修改request,也就是之前控制器Director那里,地址和请求信息的修改拼接
	p.Director(outreq)
    //outreq.Close = false的意思是表示outreq请求到下游的链接是可以被复用的
	outreq.Close = false

    //5、Upgrade头的特殊处理
     //upgradeType(outreq.Header)取出upgrade的类型并判断是否存在
	reqUpType := upgradeType(outreq.Header)
	if !ascii.IsPrint(reqUpType) {
		p.getErrorHandler()(rw, req, fmt.Errorf("client tried to switch to invalid protocol %q", reqUpType))
		return
	}
     //删除connection的head头信息
	removeConnectionHeaders(outreq.Header)

    //逐段消息头:客户端和第一代理之间的消息头,与是否往下传递head消息头是没有关联的,往下传递的信息中不应该包含这些逐段消息头
	//删除后端的逐段消息头
	for _, h := range hopHeaders {
		outreq.Header.Del(h)
	}

	//这两个特殊消息头跳过,不进行删除
	if httpguts.HeaderValuesContainsToken(req.Header["Te"], "trailers") {
		outreq.Header.Set("Te", "trailers")
	}

	if reqUpType != "" {
		outreq.Header.Set("Connection", "Upgrade")
		outreq.Header.Set("Upgrade", reqUpType)
	}

    //6、X-Forwarded-For追加ClientIP信息
     //设置 X-Forwarded-For,以逗号+空格分隔
	if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
		prior, ok := outreq.Header["X-Forwarded-For"]
		omit := ok && prior == nil // Issue 38079: nil now means don't populate the header
		if len(prior) > 0 {
			clientIP = strings.Join(prior, ", ") + ", " + clientIP
		}
		if !omit {
			outreq.Header.Set("X-Forwarded-For", clientIP)
		}
	}
	//7、向下游请求数据,拿到响应response
	res, err := transport.RoundTrip(outreq)
	if err != nil {
		p.getErrorHandler()(rw, outreq, err)
		return
	}

	//8、处理升级协议请求
     //验证响应状态码是否为101,是才考虑升级
    // Deal with 101 Switching Protocols responses: (WebSocket, h2c, etc)
	if res.StatusCode == http.StatusSwitchingProtocols {
		if !p.modifyResponse(rw, res, outreq) {
			return
		}
        //请求升级方法(具体源码步骤见补充)
		p.handleUpgradeResponse(rw, outreq, res)
		return
	}

    //9、移除逐段消息头,删除从下游返回的无用的数据
	removeConnectionHeaders(res.Header)

	for _, h := range hopHeaders {
		res.Header.Del(h)
	}
    
    //10、修改response返回内容
	if !p.modifyResponse(rw, res, outreq) {
		return
	}

    //11、拷贝头部数据
	copyHeader(rw.Header(), res.Header)

	 //处理Trailer头部
	announcedTrailers := len(res.Trailer)
	if announcedTrailers > 0 {
		trailerKeys := make([]string, 0, len(res.Trailer))
		for k := range res.Trailer {
			trailerKeys = append(trailerKeys, k)
		}
		rw.Header().Add("Trailer", strings.Join(trailerKeys, ", "))
	}

    //12、写入状态码
	rw.WriteHeader(res.StatusCode)

    //13、按周期刷新内容到response
	err = p.copyResponse(rw, res.Body, p.flushInterval(res))
	if err != nil {
		defer res.Body.Close()
		if !shouldPanicOnCopyError(req) {
			p.logf("suppressing panic for copyResponse error in test; copy error: %v", err)
			return
		}
		panic(http.ErrAbortHandler)
	}
    //读取完body内容后,对body进行关闭
	res.Body.Close()

    //对Trailer逻辑处理
	if len(res.Trailer) > 0 {
		if fl, ok := rw.(http.Flusher); ok {
			fl.Flush()
		}
	}

	if len(res.Trailer) == announcedTrailers {
		copyHeader(rw.Header(), res.Trailer)
		return
	}

	for k, vv := range res.Trailer {
		k = http.TrailerPrefix + k
		for _, v := range vv {
			rw.Header().Add(k, v)
		}
	}
}

上文代码步骤注释如下:

  1. 验证是否请求终止

    若请求终止,我们就不会把这个服务请求下游,例如关闭浏览器、网络断开等等,那么就会终止请求

  2. 设置请求context信息

    如果上游传了部分context信息,那么我就会将这一部分的context信息做设置

  3. 深拷贝header

  4. 修改req

    这里的修改request信息就包含了请求到下游的特殊的head头信息的变更,比如X-Forwarded-For,X-Real-IP

  5. Upgrade头的特殊处理

  6. 追加ClientIP信息

    这里就是X-Forwarded-For,X-Real-IP这一块的设置

  7. 向下游请求数据

    transport、roundtrip?方法

  8. 处理升级协议请求

  9. 移除逐段头部

  10. 修改返回数据

  11. 拷贝头部的数据

  12. 写入状态码

  13. 周期刷新内容到response

  • NewSingleHostReverseProxy 创建反向代理的方法,最后传出一个控制器Director

说白了就是一个拼接罢了:

新建一个proxy,如果请求的路径是 http://127.0.0.1:2002/dir,目标rs路径是 http://127.0.0.1:2003/base,那么实际路径为 http://127.0.0.1:2003/base/dir

//新建一个proxy
//如果请求的路径是 http://127.0.0.1:2002/dir
//目标rs路径是 http://127.0.0.1:2003/base
//那么实际路径为 http://127.0.0.1:2003/base/dir
func NewSingleHostReverseProxy(target *url.URL) *ReverseProxy {
    //http://127.0.0.1:2002/dir?name=123
    //targetQuery: name=123
    //Scheme: http
    //Host: 127.0.0.1:2002
   targetQuery := target.RawQuery
   director := func(req *http.Request) {
      req.URL.Scheme = target.Scheme
      req.URL.Host = target.Host
       //joinURLPath("/base","/dir")
      req.URL.Path, req.URL.RawPath = joinURLPath(target, req.URL)
      if targetQuery == "" || req.URL.RawQuery == "" {
         req.URL.RawQuery = targetQuery + req.URL.RawQuery
      } else {
         req.URL.RawQuery = targetQuery + "&" + req.URL.RawQuery
      }
      if _, ok := req.Header["User-Agent"]; !ok {
         // explicitly disable User-Agent so it's not set to default value
         req.Header.Set("User-Agent", "")
      }
   }
   return &ReverseProxy{Director: director}
}
  • 更改返回内容方法 modifyFunc
modifyFunc := func(res *http.Response) error {
   if res.StatusCode != 200 {
      return errors.New("error statusCode")
      //从res.Body拿到返回内容
      oldPayload, err := ioutil.ReadAll(res.Body)
      if err != nil {
         return err
      }
      //追加新的返回内容
      newPayLoad := []byte("hello " + string(oldPayload))
      //将新的返回内容重写回到res.Body中
      res.Body = ioutil.NopCloser(bytes.NewBuffer(newPayLoad))
      //由于每一次response的时候都会有一个ContentLength,它是和body中的长度相匹配的,所以也要重新赋值
      res.ContentLength = int64(len(newPayLoad))
      res.Header.Set("Content-Length", fmt.Sprint(len(newPayLoad)))
   }
   return nil
}
补充知识:特殊Head头

ReverseProxy内部源码牵扯到关于head头的逻辑

  • 「X-Forwarded-For」

​ 标志「我」的ip对应到的每一个反向代理服务器的ip做一个列表的呈现。记录最后直连实际服务器之前,整个代理过程,但是它可能会被伪造

如图:每经过一个服务器,它都会在X-Forwarded-For中记录之前经过的服务器ip,但是在经过的服务器中,可能会被伪造,将当前服务器之前的ip进行修改的操作,但是在当前服务器的前一个ip一定是正确的无法伪造。

image-20221010161838704

但是在内网中,若我们只拿到前一个ip,也是没有什么很大的实际作用的。所以就引出了下一个head头X-Real-IP

  • 「X-Real-IP」:「我」实际请求的IP的标记

    拿到真实的服务器ip;每过一层代理都会被覆盖掉,因此只需要第一代理设置并转发,后续代理只用转发即可,如此一来,最后的实际服务器中获取到的X-Real-IP就是真实的客户端ip了,不会是伪造的。

image-20221010162543387

  • 「Connection」:标记「我」请求的时候状态是关闭还是长连接还是一个升级连接
  • 「TE」:「我」希望的传输类型是什么,是一个请求的head头
  • 「Trailer」:返回的head头,标志为允许发送方在消息后面添加的一些原信息,比如超时连接时间等等
补充知识:请求升级handleUpgradeResponse
func (p *ReverseProxy) handleUpgradeResponse(rw http.ResponseWriter, req *http.Request, res *http.Response) {
    //比对上游和下游的协议,判断是否都升级成功
   reqUpType := upgradeType(req.Header)
   resUpType := upgradeType(res.Header)
   if !ascii.IsPrint(resUpType) { 
      p.getErrorHandler()(rw, req, fmt.Errorf("backend tried to switch to invalid protocol %q", resUpType))
   }
   if !ascii.EqualFold(reqUpType, resUpType) {
      p.getErrorHandler()(rw, req, fmt.Errorf("backend tried to switch protocol %q when %q was requested", resUpType, reqUpType))
      return
   }
	//劫持当前http,通过向下转型的方式获得connection
   hj, ok := rw.(http.Hijacker)
   if !ok {
      p.getErrorHandler()(rw, req, fmt.Errorf("can't switch protocols using non-Hijacker ResponseWriter type %T", rw))
      return
   }
   backConn, ok := res.Body.(io.ReadWriteCloser)
   if !ok {
      p.getErrorHandler()(rw, req, fmt.Errorf("internal error: 101 switching protocols response with non-writable body"))
      return
   }

   backConnCloseCh := make(chan bool)
   go func() {
      // Ensure that the cancellation of a request closes the backend.
      // See issue https://golang.org/issue/35559.
      select {
      case <-req.Context().Done():
      case <-backConnCloseCh:
      }
      backConn.Close()
   }()

   defer close(backConnCloseCh)

   conn, brw, err := hj.Hijack()
   if err != nil {
      p.getErrorHandler()(rw, req, fmt.Errorf("Hijack failed on protocol switch: %v", err))
      return
   }
   defer conn.Close()

   copyHeader(rw.Header(), res.Header)

   res.Header = rw.Header()
    //将response的body赋值为空,只写入头部信息
   res.Body = nil
    //将下游的数据写入response里面
   if err := res.Write(brw); err != nil {
      p.getErrorHandler()(rw, req, fmt.Errorf("response write: %v", err))
      return
   }
    //刷新写入状态
   if err := brw.Flush(); err != nil {
      p.getErrorHandler()(rw, req, fmt.Errorf("response flush: %v", err))
      return
   }
   errc := make(chan error, 1)
   //升级成功,但是还有保持一直维持的状态
    //交换协议,一直维持互相拷贝,直到一方报错,返回
   spc := switchProtocolCopier{user: conn, backend: backConn}
   go spc.copyToBackend(errc)
   go spc.copyFromBackend(errc)
   <-errc
   return
}
拓展ReverseProxy的功能(又要新开一篇了)
  • 负载均衡
    • 随机
    • 轮询
    • 加权轮询
    • 一致性哈希
  • 中间件支持
    • 基于路由,对请求进行拦截,校验这一类的操作
  • 限流、熔断
    • 服务高可用策略
  • 权限认证
    • IP黑白名单,jwt等验证规则
  • 数据统计
    • 单机数据统计
    • 分布式数据统计
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>