本文分析
yurthub
源码,第一部分。本文以commit id:
180282663457080119a1bc6076cce20c922b5c50
, 对应版本tag:v1.2.1
的源码分析yurthub的实现逻辑。
yurthub是部署在每个边缘节点上用来实现边缘自治的组件。在云边通信正常的情况下实现apiserver的请求转发,断网的情况下通过本地的缓存数据保证节点上容器的正常运行。
基本架构图:
pkg包中yurthub代码目录结构;
1 | yurthub |
1. NewCmdStartYurtHub
openyurt
的代码风格与k8s的一致,由cmd为入口,pkg为主要的实现逻辑。
以下是cmd的main函数。
1 | func main() { |
main 函数主要创建 NewCmdStartYurtHub 对象。NewCmd的函数一般都包含以下的几个部分,运行顺序从上到下:
-
NewYurtHubOptions:创建option参数对象,主要用于flag参数解析到option的结构体。
-
yurtHubOptions.AddFlags(cmd.Flags()):添加AddFlags,设置flag参数信息。
-
yurtHubOptions.Validate():校验flag解析后的option的参数合法性。
-
yurtHubCfg, err := config.Complete(yurtHubOptions):将option的参数转换为config的对象。
-
Run(ctx, yurtHubCfg):基于config执行run函数,运行cmd的核心逻辑。
1 | // NewCmdStartYurtHub creates a *cobra.Command object with default parameters |
以上flag、option、config的构建函数此处不做分析,以下分析run函数的逻辑。
2. Run(ctx, yurtHubCfg)
Run函数部分主要构建了几个manager,每个manager各司其职,负责对应的逻辑。有的manager在该函数中直接构建后执行manager.run的逻辑。有的则作为参数传入下一级函数中再执行manager.run函数。
主要包括以下的manager:
-
transportManager
-
cloudHealthChecker
-
restConfigMgr
-
cacheMgr
-
gcMgr
-
tenantMgr
-
NetworkMgr
每个manager的实现细节此处暂不做分析。
此处先贴一下完整源码,避免读者还需要去翻代码。
1 | // Run runs the YurtHubConfiguration. This should never exit |
除了上述的各种manager的构造及运行外,run函数中还构建了yurtProxyHandler
,最终执行RunYurtHubServers
运行一组不会退出的http server。以下先不对manager的实现做展开,而直接分析RunYurtHubServers的逻辑。RunYurtHubServers的代码在pkg包中。
3. RunYurtHubServers
RunYurtHubServers就是一个传统的http server的运行逻辑,主要包括几个不同类型的http server。http server的运行逻辑可以概括如下:
-
hubServerHandler := mux.NewRouter(): 新建路由创建handler
-
registerHandlers(hubServerHandler, cfg, rest): 注册路由
-
YurtHubServerServing.Serve:执行http server.Serve函数启动一个server服务。
http server分为两类:
-
yurthub http server: yurthub metrics, healthz的接口。
-
yurthub proxy server: 代理kube-apiserver的请求。
3.1. YurtHubServerServing
1 | hubServerHandler := mux.NewRouter() |
registerHandlers的路由内容如下:
1 | // registerHandler registers handlers for yurtHubServer, and yurtHubServer can handle requests like profiling, healthz, update token. |
以上路由不做深入分析。
3.2. YurtHubProxyServerServing
YurtHubProxyServerServing主要代理kube-apiserver的转发请求。
1 | // start yurthub proxy servers for forwarding requests to cloud kube-apiserver |
以下分析yurtProxyHandler的逻辑。
3.3. NewYurtReverseProxyHandler
NewYurtReverseProxyHandler主要创建了http handler 代理所有转发请求。
1、创建Load Balancer,主要用来转发apiserver的请求。
1 | lb, err := remote.NewLoadBalancer( |
2、创建local Proxy,主要用来转发本地缓存的请求。
1 | // When yurthub works in Edge mode, we may use local proxy or pool proxy to handle |
3、创建yurtReverseProxy
1 | yurtProxy := &yurtReverseProxy{ |
4. yurtReverseProxy
yurtReverseProxy
主要是作为实现反向代理的结构体。
1 | type yurtReverseProxy struct { |
反向代理服务
1 | func (p *yurtReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { |
核心逻辑:如果是云端apiserver可以访问的通,则通过loadbalaner来转发,否则就通过localproxy来转发读取本地节点的数据。
5. LoadBalancer
LoadBalancer是个本地的负载均衡逻辑,通过轮询的方式去请求cloud的apiserver,当云边网络通信是正常的时候反向代理apiserver的请求,并做本地缓存持久化。断网的时候则读取本地的缓存数据。
-
backends:真实反向代理的后端
-
algo: 处理负载均衡策略,轮询或者按优先级
-
localCacheMgr: 本地缓存管理的manager
1 | type loadBalancer struct { |
5.1. NewLoadBalancer
NewLoadBalancer构建一个remote的反向代理,主要包含添加romote server proxy和处理负载均衡策略两部分。
1、添加多个apiserver的地址,创建remote proxy实现反向代理操作。
1 | backends := make([]*util.RemoteProxy, 0, len(remoteServers)) |
2、处理负载均衡策略:
1 | var algo loadBalancerAlgo |
5.2. loadBalancer.ServeHTTP
loadBalancer实现ServeHTTP的接口,通过负载均衡策略挑选出一个可用的反向代理backend。再调用backend的ServeHTTP方法实现具体的反向代理操作。
1 | // pick a remote proxy based on the load balancing algorithm. |
5.3. errorHandler
如果请求apiserver失败,当verb=get/list, 则读取cache中的内容。
1 | func (lb *loadBalancer) errorHandler(rw http.ResponseWriter, req *http.Request, err error) { |
5.4. QueryCache
1 | // QueryCache get runtime object from backend storage for request |
5.5. 查询storage中的数据
1 | func (cm *cacheManager) queryOneObject(req *http.Request) (runtime.Object, error) { |
目前存储有两种接口实现,一个是本地磁盘存储,一个是etcd存储。以下以磁盘存储为例分析。
1 | // Get will get content from the regular file that specified by key. |
6. RemoteProxy
RemoteProxy实现一个具体的反向代理操作。
字段说明:
-
reverseProxy:http的ReverseProxy
-
remoteServer:apiserver的地址
1 | // RemoteProxy is an reverse proxy for remote server |
实现ReverseProxy的ServeHTTP接口。
1 | func (rp *RemoteProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) { |
实现错误处理的接口。
1 | func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { |
7. LocalProxy
LocalProxy是一个当云边网络断开的时候,用于处理本地kubelet请求的数据的代理。
字段说明:
- cacheMgr:主要包含本地cache的一个处理管理器。
1 | // LocalProxy is responsible for handling requests when remote servers are unhealthy |
LocalProxy实现ServeHTTP接口,根据不同的k8s请求类型,执行不同的操作:
-
watch:lp.localWatch(w, req)
-
create:lp.localPost(w, req)
-
delete, deletecollection: localDelete(w, req)
-
list., get, update:lp.localReqCache(w, req)
1 | // ServeHTTP implements http.Handler for LocalProxy |
7.1. localReqCache
当边缘网络断连的时候,kubelet执行get list的操作时,通过localReqCache请求本地缓存的数据,返回给kubelet对应的k8s元数据。
1 | // localReqCache handles Get/List/Update requests when remote servers are unhealthy |
核心代码为:
查询本地缓存,返回缓存数据。
1 | obj, err := lp.cacheMgr.QueryCache(req) |
总结
yurthub是实现边缘断网自治的核心组件,核心逻辑是kubelet向apiserver的请求会通过yurhub进行转发,如果apiserver的接口可通,则将请求结果返回,并存储到本地,如果接口不可通,则读取本地的数据。
yurthub本质是一个反向代理的http server, 核心逻辑主要包括 :
1 | - proxy: 反向代理的实现 |
参考:
赞赏一下