本文以commit id:
180282663457080119a1bc6076cce20c922b5c50
, 对应版本tag:v1.2.1
的源码分析tunnel-server的实现逻辑。
1. Tunnel-server简介
云与边一般位于不同网络平面,同时边缘节点普遍位于防火墙内部,采用云(中心)边协同架构,将导致原生 K8s 系统的运维监控能力面临如下挑战:
- K8s 原生运维能力缺失(如 kubectl logs/exec 等无法执行)
- 社区主流监控运维组件无法工作(如 Prometheus/metrics-server )
在 OpenYurt 中,引入了专门的组件 YurtTunnel 负责解决云边通信问题。反向通道是解决跨网络通信的一种常见方式,而 YurtTunnel 的本质也是一个反向通道。 它是一个典型的C/S结构的组件,由部署于云端的 YurtTunnelServer 和部署于边缘节点上的 YurtTunnelAgent组成。
本文主要分析tunnel-server的代理逻辑。
以下是基本架构图:
pkg中的目录结构:
yurttunnel
├── agent # tunnel agent代码
├── constants # 常量值
├── handlerwrapper
├── informers
├── kubernetes # k8s clientset工具包
├── server # 核心代码: tunnel server逻辑
├── trafficforward # iptables和dns操作
└── util
2. NewYurttunnelServerCommand
main函数入口:
func main() {
cmd := app.NewYurttunnelServerCommand(stop)
cmd.Flags().AddGoFlagSet(flag.CommandLine)
if err := cmd.Execute(); err != nil {
klog.Fatalf("%s failed: %s", projectinfo.GetServerName(), err)
}
}
以下是NewYurttunnelServerCommand构造函数,常见的三件套,不做展开:
读取参数:serverOptions.AddFlags(cmd.Flags())
生成配置:cfg.Complete()
执行Run函数:核心逻辑
func NewYurttunnelServerCommand(stopCh <-chan struct{}) *cobra.Command {
serverOptions := options.NewServerOptions()
cmd := &cobra.Command{
Use: "Launch " + projectinfo.GetServerName(),
Short: projectinfo.GetServerName() + " sends requests to " + projectinfo.GetAgentName(),
RunE: func(c *cobra.Command, args []string) error {
...
cfg, err := serverOptions.Config()
if err != nil {
return err
}
if err := Run(cfg.Complete(), stopCh); err != nil {
return err
}
return nil
},
Args: cobra.NoArgs,
}
serverOptions.AddFlags(cmd.Flags())
return cmd
}
3. Run(cfg.Complete(), stopCh)
Run函数最终是运行一个tunnelserver的常驻进程。在之前会做一些controller或manager的准备工作。
其中包括:
DNS controller
IP table manager
certificate manager
RegisterInformersForTunnelServer
首先是构建并运行上述的manager或controller, 源码中的注释也大概描述了主要流程:
注册tunnel所需的SharedInformerFactory
运行dns controller
运行ip table manager
给tunnel server创建certificate manager
给tunnel agent 创建certificate manager
创建handler wrappers
生成TLS 证书
运行tunnel server
以下是部分代码,已删除无关紧要的代码:
// run starts the yurttunel-server
func Run(cfg *config.CompletedConfig, stopCh <-chan struct{}) error {
var wg sync.WaitGroup
// register informers that tunnel server need
informers.RegisterInformersForTunnelServer(cfg.SharedInformerFactory)
// 0. start the DNS controller
if cfg.EnableDNSController {
dnsController, err := dns.NewCoreDNSRecordController(...)
go dnsController.Run(stopCh)
}
// 1. start the IP table manager
if cfg.EnableIptables {
iptablesMgr, err := iptables.NewIptablesManagerWithIPFamily(...)
wg.Add(1)
go iptablesMgr.Run(stopCh, &wg)
}
// 2. create a certificate manager for the tunnel server
certManagerFactory := certfactory.NewCertManagerFactory(cfg.Client)
ips, dnsNames, err := getTunnelServerIPsAndDNSNamesBeforeInformerSynced(cfg.Client, stopCh)
serverCertMgr, err := certManagerFactory.New(...)
serverCertMgr.Start()
// 3. create a certificate manager for the tunnel proxy client
tunnelProxyCertMgr, err := certManagerFactory.New(...)
tunnelProxyCertMgr.Start()
// 4. create handler wrappers
mInitializer := initializer.NewMiddlewareInitializer(cfg.SharedInformerFactory)
wrappers, err := wraphandler.InitHandlerWrappers(mInitializer, cfg.IsIPv6())
// after all of informers are configured completed, start the shared index informer
cfg.SharedInformerFactory.Start(stopCh)
// 5. waiting for the certificate is generated
_ = wait.PollUntil(5*time.Second, func() (bool, error) {
// keep polling until the certificate is signed
if serverCertMgr.Current() != nil && tunnelProxyCertMgr.Current() != nil {
return true, nil
}
klog.Infof("waiting for the master to sign the %s certificate", projectinfo.GetServerName())
return false, nil
}, stopCh)
// 6. generate the TLS configuration based on the latest certificate
tlsCfg, err := certmanager.GenTLSConfigUseCurrentCertAndCertPool(serverCertMgr.Current, cfg.RootCert, "server")
proxyClientTlsCfg, err := certmanager.GenTLSConfigUseCurrentCertAndCertPool(tunnelProxyCertMgr.Current, cfg.RootCert, "client")
// 7. start the server
ts := server.NewTunnelServer(
cfg.EgressSelectorEnabled,
cfg.InterceptorServerUDSFile,
cfg.ListenAddrForMaster,
cfg.ListenInsecureAddrForMaster,
cfg.ListenAddrForAgent,
cfg.ServerCount,
tlsCfg,
proxyClientTlsCfg,
wrappers,
cfg.ProxyStrategy)
if err := ts.Run(); err != nil {
return err
}
// 8. start meta server
util.RunMetaServer(cfg.ListenMetaAddr)
<-stopCh
wg.Wait()
return nil
}
3. TunnelServer
anpTunnelServer实现了TunnelServer的接口,以下分析TunnelServer.Run的部分。
run部分主要运行了三个server
proxyServer: 主要是重定向apiserver的请求到tunnel server
MasterServer:
AgentServer:主要运行一个grpc server与tunnel agent连接,即云边反向隧道
// Run runs the yurttunnel-server
func (ats *anpTunnelServer) Run() error {
proxyServer := anpserver.NewProxyServer(uuid.New().String(),
[]anpserver.ProxyStrategy{anpserver.ProxyStrategy(ats.proxyStrategy)},
ats.serverCount,
&anpserver.AgentTokenAuthenticationOptions{})
// 1. start the proxier
proxierErr := runProxier(
&anpserver.Tunnel{Server: proxyServer},
ats.egressSelectorEnabled,
ats.interceptorServerUDSFile,
ats.tlsCfg)
wrappedHandler, err := wh.WrapHandler(
NewRequestInterceptor(ats.interceptorServerUDSFile, ats.proxyClientTlsCfg),
ats.wrappers,
)
// 2. start the master server
masterServerErr := runMasterServer(
wrappedHandler,
ats.egressSelectorEnabled,
ats.serverMasterAddr,
ats.serverMasterInsecureAddr,
ats.tlsCfg)
// 3. start the agent server
agentServerErr := runAgentServer(ats.tlsCfg, ats.serverAgentAddr, proxyServer)
return nil
}
4. runAgentServer
runAgentServer主要运行一个grpc server与edge端的agent形成grpc连接。
// runAgentServer runs a grpc server that handles connections from the yurttunel-agent
// NOTE agent server is responsible for managing grpc connection yurttunel-server
// and yurttunnel-agent, and the proxy server is responsible for redirecting requests
// to corresponding yurttunel-agent
func runAgentServer(tlsCfg *tls.Config,
agentServerAddr string,
proxyServer *anpserver.ProxyServer) error {
serverOption := grpc.Creds(credentials.NewTLS(tlsCfg))
ka := keepalive.ServerParameters{
// Ping the client if it is idle for `Time` seconds to ensure the
// connection is still active
Time: constants.YurttunnelANPGrpcKeepAliveTimeSec * time.Second,
// Wait `Timeout` second for the ping ack before assuming the
// connection is dead
Timeout: constants.YurttunnelANPGrpcKeepAliveTimeoutSec * time.Second,
}
grpcServer := grpc.NewServer(serverOption,
grpc.KeepaliveParams(ka))
anpagent.RegisterAgentServiceServer(grpcServer, proxyServer)
listener, err := net.Listen("tcp", agentServerAddr)
klog.Info("start handling connection from agents")
if err != nil {
return fmt.Errorf("fail to listen to agent on %s: %w", agentServerAddr, err)
}
go grpcServer.Serve(listener)
return nil
}
5. Interceptor
Interceptor(请求拦截器)拦截kube-apiserver的请求转发给tunnel,通过tunnel请求kubelet。
流程图:
// NewRequestInterceptor creates a interceptor object that intercept request from kube-apiserver
func NewRequestInterceptor(udsSockFile string, cfg *tls.Config) *RequestInterceptor {
if len(udsSockFile) == 0 || cfg == nil {
return nil
}
cfg.InsecureSkipVerify = true
contextDialer := func(addr string, header http.Header, isTLS bool) (net.Conn, error) {
klog.V(4).Infof("Sending request to %q.", addr)
proxyConn, err := net.Dial("unix", udsSockFile)
if err != nil {
return nil, fmt.Errorf("dialing proxy %q failed: %w", udsSockFile, err)
}
var connectHeaders string
for _, h := range supportedHeaders {
if v := header.Get(h); len(v) != 0 {
connectHeaders = fmt.Sprintf("%s\r\n%s: %s", connectHeaders, h, v)
}
}
fmt.Fprintf(proxyConn, "CONNECT %s HTTP/1.1\r\nHost: localhost%s\r\n\r\n", addr, connectHeaders)
br := newBufioReader(proxyConn)
defer putBufioReader(br)
res, err := http.ReadResponse(br, nil)
if err != nil {
proxyConn.Close()
return nil, fmt.Errorf("reading HTTP response from CONNECT to %s via proxy %s failed: %w", addr, udsSockFile, err)
}
if res.StatusCode != 200 {
proxyConn.Close()
return nil, fmt.Errorf("proxy error from %s while dialing %s, code %d: %v", udsSockFile, addr, res.StatusCode, res.Status)
}
// if the request scheme is https, setup a tls connection over the
// proxy tunnel (i.e. interceptor <--tls--> kubelet)
if isTLS {
tlsTunnelConn := tls.Client(proxyConn, cfg)
if err := tlsTunnelConn.Handshake(); err != nil {
proxyConn.Close()
return nil, fmt.Errorf("fail to setup TLS handshake through the Tunnel: %w", err)
}
klog.V(4).Infof("successfully setup TLS connection to %q with headers: %s", addr, connectHeaders)
return tlsTunnelConn, nil
}
klog.V(2).Infof("successfully setup connection to %q with headers: %q", addr, connectHeaders)
return proxyConn, nil
}
return &RequestInterceptor{
contextDialer: contextDialer,
}
}
参考: