OpenYurt之Tunnel-Agent源码分析

Posted by 胡伟煌 on 2023-05-10

1. Tunnel-Agent简介

tunnel-agent是通过daemonset部署在每个worker节点,通过grpc协议与云端的tunnel-server建立连接。以下分析tunnel-agent的源码逻辑。

常用的启动参数:

1
2
3
4
5
6
7
- args:
- --node-name=$(NODE_NAME)
- --node-ip=$(POD_IP)
- --tunnelserver-addr=tunnel-server-address[ip:port]
- --v=2
command:
- yurt-tunnel-agent

2. NewYurttunnelAgentCommand

NewYurttunnelAgentCommand还是常用命令代码三板斧,此处不做展开,直接分析Run函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// NewYurttunnelAgentCommand creates a new yurttunnel-agent command
func NewYurttunnelAgentCommand(stopCh <-chan struct{}) *cobra.Command {
agentOptions := options.NewAgentOptions()

// 已经删除非重要的代码
cmd := &cobra.Command{
RunE: func(c *cobra.Command, args []string) error {
cfg, err := agentOptions.Config()
if err := Run(cfg.Complete(), stopCh); err != nil {
return err
}
},
}

agentOptions.AddFlags(cmd.Flags())
return cmd
}

3. Run

Run函数即启动一个agent服务,主要包含以下几个步骤:

  1. 先获取配置项tunnelserver-addr中的地址,如果地址不存在,则获取x-tunnel-server-svc的service 地址。(说明:一般情况下,tunnel-server跟agent不在同一个网络域,因此网络会不通,所以一般需要配置独立且可连通的地址,可以通过Nginx转发)

  2. agent通过host的网络模式运行在宿主机上,启动证书manager。并等待证书生成。

  3. 生成连接tunnel-server的证书。

  4. 启动 yurttunnel-agent。

  5. 启动meta server。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// Run starts the yurttunel-agent
func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error {
// 1. get the address of the yurttunnel-server
tunnelServerAddr = cfg.TunnelServerAddr
if tunnelServerAddr == "" {
if tunnelServerAddr, err = serveraddr.GetTunnelServerAddr(cfg.Client); err != nil {
return err
}
}

// 2. create a certificate manager
// As yurttunnel-agent will run on the edge node with Host network mode,
// we can use the status.podIP as the node IP
nodeIP := os.Getenv(constants.YurttunnelAgentPodIPEnv)

agentCertMgr, err = certfactory.NewCertManagerFactory(cfg.Client).New(&certfactory.CertManagerConfig{
ComponentName: projectinfo.GetAgentName(),
CertDir: cfg.CertDir,
SignerName: certificatesv1.KubeAPIServerClientSignerName,
CommonName: constants.YurtTunnelAgentCSRCN,
Organizations: []string{constants.YurtTunnelCSROrg},
DNSNames: []string{os.Getenv("NODE_NAME")},
IPs: []net.IP{net.ParseIP(nodeIP)},
})

agentCertMgr.Start()

// 2.1. waiting for the certificate is generated
_ = wait.PollUntil(5*time.Second, func() (bool, error) {
if agentCertMgr.Current() != nil {
return true, nil
}
klog.Infof("certificate %s not signed, waiting...",
projectinfo.GetAgentName())
return false, nil
}, stopCh)

// 3. generate a TLS configuration for securing the connection to server
tlsCfg, err := certmanager.GenTLSConfigUseCertMgrAndCA(agentCertMgr,
tunnelServerAddr, constants.YurttunnelCAFile)


// 4. start the yurttunnel-agent
ta := agent.NewTunnelAgent(tlsCfg, tunnelServerAddr, cfg.NodeName, cfg.AgentIdentifiers)
ta.Run(stopCh)

// 5. start meta server
util.RunMetaServer(cfg.AgentMetaAddr)

<-stopCh
return nil
}

4. TunnelAgent

TunnelAgent与tunnel-server建立tunnel,接收server的请求,并转发给kubelet。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// TunnelAgent sets up tunnel to TunnelServer, receive requests
// from tunnel, and forwards requests to kubelet
type TunnelAgent interface {
Run(<-chan struct{})
}

// NewTunnelAgent generates a new TunnelAgent
func NewTunnelAgent(tlsCfg *tls.Config,
tunnelServerAddr, nodeName, agentIdentifiers string) TunnelAgent {
ata := anpTunnelAgent{
tlsCfg: tlsCfg,
tunnelServerAddr: tunnelServerAddr,
nodeName: nodeName,
agentIdentifiers: agentIdentifiers,
}

return &ata
}

5. anpTunnelAgent.Run

anpTunnelAgent使用apiserver-network-proxy包来实现tunnel逻辑。项目具体参考:https://github.com/kubernetes-sigs/apiserver-network-proxy)

代码:/pkg/yurttunnel/agent/anpagent.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// RunAgent runs the yurttunnel-agent which will try to connect yurttunnel-server
func (ata *anpTunnelAgent) Run(stopChan <-chan struct{}) {
dialOption := grpc.WithTransportCredentials(credentials.NewTLS(ata.tlsCfg))
cc := &anpagent.ClientSetConfig{
Address: ata.tunnelServerAddr,
AgentID: ata.nodeName,
AgentIdentifiers: ata.agentIdentifiers,
SyncInterval: 5 * time.Second,
ProbeInterval: 5 * time.Second,
DialOptions: []grpc.DialOption{dialOption},
ServiceAccountTokenPath: "",
}

cs := cc.NewAgentClientSet(stopChan)
cs.Serve()
klog.Infof("start serving grpc request redirected from %s: %s",
projectinfo.GetServerName(), ata.tunnelServerAddr)
}

以下是apiserver-network-proxy的源码分析,待补充。

参考:



支付宝打赏 微信打赏

赞赏一下