扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
这篇文章主要讲解了“kube-proxy怎么使用”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“kube-proxy怎么使用”吧!
成都创新互联专注于海南网站建设服务及定制,我们拥有丰富的企业做网站经验。 热诚为您提供海南营销型网站建设,海南网站制作、海南网页设计、海南网站官网定制、成都小程序开发服务,打造海南网络公司原创品牌,更为您提供海南网站排名全网营销落地服务。
##源码目录结构分析
cmd/kube-proxy //负责kube-proxy的创建,启动的入口 . ├── app │ ├── conntrack.go //linux kernel的nf_conntrack-sysctl的interface定义,更多关于conntracker的定义请看https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt │ ├── options │ │ └── options.go //kube-proxy的参数定义ProxyServerConfig及相关方法 │ ├── server.go //ProxyServer结构定义及其创建(NewProxyServerDefault)和运行(Run)的方法。 │ └── server_test.go └── proxy.go //kube-proxy的main方法 pkg/proxy . ├── OWNERS ├── config │ ├── api.go //给proxy配置Service和Endpoint的Reflectors和Cache.Store │ ├── api_test.go │ ├── config.go //定义ServiceUpdate,EndpointUpdate结构体以及ServiceConfigHandler,EndpointConfigHandler来处理Service和Endpoint的Update │ ├── config_test.go │ └── doc.go ├── doc.go ├── healthcheck //负责service listener和endpoint的health check,add/delete请求。 │ ├── api.go │ ├── doc.go │ ├── healthcheck.go │ ├── healthcheck_test.go │ ├── http.go │ ├── listener.go │ └── worker.go ├── iptables //proxy mode为iptables的实现 │ ├── proxier.go │ └── proxier_test.go ├── types.go ├── userspace //proxy mode为userspace的实现 │ ├── loadbalancer.go │ ├── port_allocator.go │ ├── port_allocator_test.go │ ├── proxier.go │ ├── proxier_test.go │ ├── proxysocket.go │ ├── rlimit.go │ ├── rlimit_windows.go │ ├── roundrobin.go │ ├── roundrobin_test.go │ └── udp_server.go └── winuserspace //windows OS时,proxy mode为userspace的实现 ├── loadbalancer.go ├── port_allocator.go ├── port_allocator_test.go ├── proxier.go ├── proxier_test.go ├── proxysocket.go ├── roundrobin.go ├── roundrobin_test.go └── udp_server.go
##内部实现模块逻辑图
##源码分析
###main kube-proxy的main入口在:cmd/kube-proxy/proxy.go:39
func main() { //创建kube-proxy的默认config对象 config := options.NewProxyConfig() //用kube-proxy命令行的参数替换默认参数 config.AddFlags(pflag.CommandLine) flag.InitFlags() logs.InitLogs() defer logs.FlushLogs() verflag.PrintAndExitIfRequested() //根据config创建ProxyServer s, err := app.NewProxyServerDefault(config) if err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } //执行Run方法让kube-proxy开始干活了 if err = s.Run(); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }
main方法中,我们重点关注app.NewProxyServerDefault(config)创建ProxyServer和Run方法。
###创建ProxyServer NewProxyServerDefault负责根据提供的config参数创建一个新的ProxyServer对象,其代码比较长,逻辑相对复杂,下面会挑重点说一下。
cmd/kube-proxy/app/server.go:131 func NewProxyServerDefault(config *options.ProxyServerConfig) (*ProxyServer, error) { ... // Create a iptables utils. execer := exec.New() if runtime.GOOS == "windows" { netshInterface = utilnetsh.New(execer) } else { dbus = utildbus.New() iptInterface = utiliptables.New(execer, dbus, protocol) } ... //设置OOM_SCORE_ADJ var oomAdjuster *oom.OOMAdjuster if config.OOMScoreAdj != nil { oomAdjuster = oom.NewOOMAdjuster() if err := oomAdjuster.ApplyOOMScoreAdj(0, int(*config.OOMScoreAdj)); err != nil { glog.V(2).Info(err) } } ... // Create a Kube Client ... // 创建event Broadcaster和event recorder hostname := nodeutil.GetHostname(config.HostnameOverride) eventBroadcaster := record.NewBroadcaster() recorder := eventBroadcaster.NewRecorder(v1.EventSource{Component: "kube-proxy", Host: hostname}) //定义proxier和endpointsHandler,分别用于处理services和endpoints的update event。 var proxier proxy.ProxyProvider var endpointsHandler proxyconfig.EndpointsConfigHandler //从config中获取proxy mode proxyMode := getProxyMode(string(config.Mode), client.Core().Nodes(), hostname, iptInterface, iptables.LinuxKernelCompatTester{}) // proxy mode为iptables场景 if proxyMode == proxyModeIPTables { glog.V(0).Info("Using iptables Proxier.") if config.IPTablesMasqueradeBit == nil { // IPTablesMasqueradeBit must be specified or defaulted. return nil, fmt.Errorf("Unable to read IPTablesMasqueradeBit from config") } //调用pkg/proxy/iptables/proxier.go:222中的iptables.NewProxier来创建proxier,赋值给前面定义的proxier和endpointsHandler,表示由该proxier同时负责service和endpoint的event处理。 proxierIPTables, err := iptables.NewProxier(iptInterface, utilsysctl.New(), execer, config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.MasqueradeAll, int(*config.IPTablesMasqueradeBit), config.ClusterCIDR, hostname, getNodeIP(client, hostname)) if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierIPTables endpointsHandler = proxierIPTables // No turning back. Remove artifacts that might still exist from the userspace Proxier. glog.V(0).Info("Tearing down userspace rules.") userspace.CleanupLeftovers(iptInterface) } // proxy mode为userspace场景 else { glog.V(0).Info("Using userspace Proxier.") // This is a proxy.LoadBalancer which NewProxier needs but has methods we don't need for // our config.EndpointsConfigHandler. loadBalancer := userspace.NewLoadBalancerRR() // set EndpointsConfigHandler to our loadBalancer endpointsHandler = loadBalancer var proxierUserspace proxy.ProxyProvider // windows OS场景下,调用pkg/proxy/winuserspace/proxier.go:146的winuserspace.NewProxier来创建proxier。 if runtime.GOOS == "windows" { proxierUserspace, err = winuserspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), netshInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), // TODO @pires replace below with default values, if applicable config.IPTablesSyncPeriod.Duration, config.UDPIdleTimeout.Duration, ) } // linux OS场景下,调用pkg/proxy/userspace/proxier.go:143的userspace.NewProxier来创建proxier。 else { proxierUserspace, err = userspace.NewProxier( loadBalancer, net.ParseIP(config.BindAddress), iptInterface, *utilnet.ParsePortRangeOrDie(config.PortRange), config.IPTablesSyncPeriod.Duration, config.IPTablesMinSyncPeriod.Duration, config.UDPIdleTimeout.Duration, ) } if err != nil { glog.Fatalf("Unable to create proxier: %v", err) } proxier = proxierUserspace // Remove artifacts from the pure-iptables Proxier, if not on Windows. if runtime.GOOS != "windows" { glog.V(0).Info("Tearing down pure-iptables proxy rules.") iptables.CleanupLeftovers(iptInterface) } } // Add iptables reload function, if not on Windows. if runtime.GOOS != "windows" { iptInterface.AddReloadFunc(proxier.Sync) } // Create configs (i.e. Watches for Services and Endpoints) // 创建serviceConfig负责service的watchforUpdates serviceConfig := proxyconfig.NewServiceConfig() //给serviceConfig注册proxier,既添加对应的listener用来处理service update时逻辑。 serviceConfig.RegisterHandler(proxier) // 创建endpointsConfig负责endpoint的watchforUpdates endpointsConfig := proxyconfig.NewEndpointsConfig() //给endpointsConfig注册endpointsHandler,既添加对应的listener用来处理endpoint update时的逻辑。 endpointsConfig.RegisterHandler(endpointsHandler) //NewSourceAPI creates config source that watches for changes to the services and endpoints. //NewSourceAPI通过ListWatch apiserver的Service和endpoint,并周期性的维护serviceStore和endpointStore的更新 proxyconfig.NewSourceAPI( client.Core().RESTClient(), config.ConfigSyncPeriod, serviceConfig.Channel("api"), //Service Update Channel endpointsConfig.Channel("api"), //endpoint update channel ) ... //把前面创建的对象作为参数,构造出ProxyServer对象。 return NewProxyServer(client, config, iptInterface, proxier, eventBroadcaster, recorder, conntracker, proxyMode) }
NewProxyServerDefault中的核心逻辑我都已经在上述代码中添加了注释,其中有几个地方需要我们再深入跟进去看看:proxyconfig.NewServiceConfig,proxyconfig.NewEndpointsConfig,serviceConfig.RegisterHandler,endpointsConfig.RegisterHandler,proxyconfig.NewSourceAPI。
####proxyconfig.NewServiceConfig 我们对ServiceConfig的代码分析一遍,EndpointsConfig的代码则类似。
pkg/proxy/config/config.go:192 func NewServiceConfig() *ServiceConfig { // 创建updates channel updates := make(chan struct{}, 1) // 构建serviceStore对象 store := &serviceStore{updates: updates, services: make(map[string]map[types.NamespacedName]api.Service)} mux := config.NewMux(store) // 新建Broadcaster,在后续的serviceConfig.RegisterHandler会注册该Broadcaster的listener。 bcaster := config.NewBroadcaster() //启动协程,马上开始watch updates channel go watchForUpdates(bcaster, store, updates) return &ServiceConfig{mux, bcaster, store} }
下面我们再跟进watchForUpdates去看看。
pkg/proxy/config/config.go:292 func watchForUpdates(bcaster *config.Broadcaster, accessor config.Accessor, updates <-chan struct{}) { for true { <-updates bcaster.Notify(accessor.MergedState()) } }
watchForUpdates就是一直在watch updates channel,如果有数据,则立刻由该Broadcaster Notify到注册的listeners。 Notify的代码如下,可见,它负责将数据通知给所有的listener,并调用各个listener的OnUpdate方法。
pkg/util/config/config.go:133 // Notify notifies all listeners. func (b *Broadcaster) Notify(instance interface{}) { b.listenerLock.RLock() listeners := b.listeners b.listenerLock.RUnlock() for _, listener := range listeners { listener.OnUpdate(instance) } } func (f ListenerFunc) OnUpdate(instance interface{}) { f(instance) }
####serviceConfig.RegisterHandler 上面分析的proxyconfig.NewServiceConfig负责创建ServiceConfig,开始watch updates channel了,当从channel中取到值的时候,Broadcaster就会通知listener进行处理。serviceConfig.RegisterHandler正是负责给Broadcaster注册listener的,其代码如下。
pkg/proxy/config/config.go:205 func (c *ServiceConfig) RegisterHandler(handler ServiceConfigHandler) { //给ServiceConfig的Broadcaster注册listener。 c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnServiceUpdate()") handler.OnServiceUpdate(instance.([]api.Service)) })) }
上面分析proxyconfig.NewServiceConfig时可知,当从updates channel中取到值的时候,最终会调用对应的ListenerFunc(instance)进行处理,在这里,也就是调用:
func(instance interface{}) { glog.V(3).Infof("Calling handler.OnServiceUpdate()") handler.OnServiceUpdate(instance.([]api.Service)) }
即调用到handler.OnServiceUpdate。每种proxymode对应的proxier都有对应的handler.OnServiceUpdate接口实现,我们以iptables mode为例,看看handler.OnServiceUpdate的实现:
pkg/proxy/iptables/proxier.go:428 func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) { ... proxier.syncProxyRules() proxier.deleteServiceConnections(staleUDPServices.List()) }
因此,最终关键的逻辑都转向了proxier.syncProxyRules(),从我们上面给出的内部模块交互图也能看得出来。对于proxier.syncProxyRules(),我们放到后面来详细讨论,现在你只要知道proxier.syncProxyRules()负责将proxy中缓存的service/endpoint同步更新到iptables中生成对应Chain和NAT Rules。
####proxyconfig.NewEndpointsConfig endpointsConfig的逻辑和serviceConfig的类似,在这里只给出对应代码,不再跟进分析。
pkg/proxy/config/config.go:84 func NewEndpointsConfig() *EndpointsConfig { // The updates channel is used to send interrupts to the Endpoints handler. // It's buffered because we never want to block for as long as there is a // pending interrupt, but don't want to drop them if the handler is doing // work. updates := make(chan struct{}, 1) store := &endpointsStore{updates: updates, endpoints: make(map[string]map[types.NamespacedName]api.Endpoints)} mux := config.NewMux(store) bcaster := config.NewBroadcaster() go watchForUpdates(bcaster, store, updates) return &EndpointsConfig{mux, bcaster, store} }
####endpointsConfig.RegisterHandler
pkg/proxy/config/config.go:97 func (c *EndpointsConfig) RegisterHandler(handler EndpointsConfigHandler) { c.bcaster.Add(config.ListenerFunc(func(instance interface{}) { glog.V(3).Infof("Calling handler.OnEndpointsUpdate()") handler.OnEndpointsUpdate(instance.([]api.Endpoints)) })) }
####proxyconfig.NewSourceAPI
proxyconfig.NewSourceAPI是很关键的,它负责给service updates channel和endpoint updates channel配置数据源,它是通过周期性的List和Watch kube-apiserver中的all service and endpoint来提供数据的,发给对应的channel。默认的List周期是15min,可通过--config-sync-period
修改。下面来看其具体代码:
func NewSourceAPI(c cache.Getter, period time.Duration, servicesChan chan<- ServiceUpdate, endpointsChan chan<- EndpointsUpdate) { servicesLW := cache.NewListWatchFromClient(c, "services", api.NamespaceAll, fields.Everything()) cache.NewReflector(servicesLW, &api.Service{}, NewServiceStore(nil, servicesChan), period).Run() endpointsLW := cache.NewListWatchFromClient(c, "endpoints", api.NamespaceAll, fields.Everything()) cache.NewReflector(endpointsLW, &api.Endpoints{}, NewEndpointsStore(nil, endpointsChan), period).Run() } // NewServiceStore creates an undelta store that expands updates to the store into // ServiceUpdate events on the channel. If no store is passed, a default store will // be initialized. Allows reuse of a cache store across multiple components. func NewServiceStore(store cache.Store, ch chan<- ServiceUpdate) cache.Store { fn := func(objs []interface{}) { var services []api.Service for _, o := range objs { services = append(services, *(o.(*api.Service))) } ch <- ServiceUpdate{Op: SET, Services: services} } if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } return &cache.UndeltaStore{ Store: store, PushFunc: fn, } } // NewEndpointsStore creates an undelta store that expands updates to the store into // EndpointsUpdate events on the channel. If no store is passed, a default store will // be initialized. Allows reuse of a cache store across multiple components. func NewEndpointsStore(store cache.Store, ch chan<- EndpointsUpdate) cache.Store { fn := func(objs []interface{}) { var endpoints []api.Endpoints for _, o := range objs { endpoints = append(endpoints, *(o.(*api.Endpoints))) } ch <- EndpointsUpdate{Op: SET, Endpoints: endpoints} } if store == nil { store = cache.NewStore(cache.MetaNamespaceKeyFunc) } return &cache.UndeltaStore{ Store: store, PushFunc: fn, } }
代码很简单,不需要过多解释。
###执行Run开始工作 创建完ProxyServer后,就执行Run方法开始工作了,它主要负责周期性(default 30s)的同步proxy中的services/endpionts到iptables中生成对应Chain and NAT Rules。
cmd/kube-proxy/app/server.go:308 func (s *ProxyServer) Run() error { ... // Start up a webserver if requested if s.Config.HealthzPort > 0 { http.HandleFunc("/proxyMode", func(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "%s", s.ProxyMode) }) configz.InstallHandler(http.DefaultServeMux) go wait.Until(func() { err := http.ListenAndServe(s.Config.HealthzBindAddress+":"+strconv.Itoa(int(s.Config.HealthzPort)), nil) if err != nil { glog.Errorf("Starting health server failed: %v", err) } }, 5*time.Second, wait.NeverStop) } ... // Just loop forever for now... s.Proxier.SyncLoop() return nil }
Run方法关键代码很简单,就是执行对应proxier的SyncLoop()。我们还是以iptables mode为例,看看它是如何实现SyncLoop()的:
pkg/proxy/iptables/proxier.go:416 // SyncLoop runs periodic work. This is expected to run as a goroutine or as the main loop of the app. It does not return. func (proxier *Proxier) SyncLoop() { t := time.NewTicker(proxier.syncPeriod) defer t.Stop() for { <-t.C glog.V(6).Infof("Periodic sync") proxier.Sync() } }
SyncLoop中,通过设置定时器,默认每30s会执行一次proxier.Sync(),可以通过--iptables-sync-period
修改默认时间。那我们继续跟进Sync()的代码:
pkg/proxy/iptables/proxier.go:409 // Sync is called to immediately synchronize the proxier state to iptables func (proxier *Proxier) Sync() { proxier.mu.Lock() defer proxier.mu.Unlock() proxier.syncProxyRules() }
可见,最终还是调用proxier.syncProxyRules()。前一节中创建ProxyServer的分析也是一样,最终watch到service/endpoint有更新时,都会调用到proxier.syncProxyRules()。那下面我们就来看看proxier.syncProxyRules()的代码。
###proxier.syncProxyRules
下面的proxier.syncProxyRules代码是iptables mode对应的实现。userspace mode的代码我就不贴了。
pkg/proxy/iptables/proxier.go:791 // This is where all of the iptables-save/restore calls happen. // The only other iptables rules are those that are setup in iptablesInit() // assumes proxier.mu is held func (proxier *Proxier) syncProxyRules() { if proxier.throttle != nil { proxier.throttle.Accept() } start := time.Now() defer func() { glog.V(4).Infof("syncProxyRules took %v", time.Since(start)) }() // don't sync rules till we've received services and endpoints if !proxier.haveReceivedEndpointsUpdate || !proxier.haveReceivedServiceUpdate { glog.V(2).Info("Not syncing iptables until Services and Endpoints have been received from master") return } glog.V(3).Infof("Syncing iptables rules") // Create and link the kube services chain. { tablesNeedServicesChain := []utiliptables.Table{utiliptables.TableFilter, utiliptables.TableNAT} for _, table := range tablesNeedServicesChain { if _, err := proxier.iptables.EnsureChain(table, kubeServicesChain); err != nil { glog.Errorf("Failed to ensure that %s chain %s exists: %v", table, kubeServicesChain, err) return } } tableChainsNeedJumpServices := []struct { table utiliptables.Table chain utiliptables.Chain }{ {utiliptables.TableFilter, utiliptables.ChainOutput}, {utiliptables.TableNAT, utiliptables.ChainOutput}, {utiliptables.TableNAT, utiliptables.ChainPrerouting}, } comment := "kubernetes service portals" args := []string{"-m", "comment", "--comment", comment, "-j", string(kubeServicesChain)} for _, tc := range tableChainsNeedJumpServices { if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, tc.table, tc.chain, args...); err != nil { glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", tc.table, tc.chain, kubeServicesChain, err) return } } } // Create and link the kube postrouting chain. { if _, err := proxier.iptables.EnsureChain(utiliptables.TableNAT, kubePostroutingChain); err != nil { glog.Errorf("Failed to ensure that %s chain %s exists: %v", utiliptables.TableNAT, kubePostroutingChain, err) return } comment := "kubernetes postrouting rules" args := []string{"-m", "comment", "--comment", comment, "-j", string(kubePostroutingChain)} if _, err := proxier.iptables.EnsureRule(utiliptables.Prepend, utiliptables.TableNAT, utiliptables.ChainPostrouting, args...); err != nil { glog.Errorf("Failed to ensure that %s chain %s jumps to %s: %v", utiliptables.TableNAT, utiliptables.ChainPostrouting, kubePostroutingChain, err) return } } // Get iptables-save output so we can check for existing chains and rules. // This will be a map of chain name to chain with rules as stored in iptables-save/iptables-restore existingFilterChains := make(map[utiliptables.Chain]string) iptablesSaveRaw, err := proxier.iptables.Save(utiliptables.TableFilter) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output existingFilterChains = utiliptables.GetChainLines(utiliptables.TableFilter, iptablesSaveRaw) } existingNATChains := make(map[utiliptables.Chain]string) iptablesSaveRaw, err = proxier.iptables.Save(utiliptables.TableNAT) if err != nil { // if we failed to get any rules glog.Errorf("Failed to execute iptables-save, syncing all rules: %v", err) } else { // otherwise parse the output existingNATChains = utiliptables.GetChainLines(utiliptables.TableNAT, iptablesSaveRaw) } filterChains := bytes.NewBuffer(nil) filterRules := bytes.NewBuffer(nil) natChains := bytes.NewBuffer(nil) natRules := bytes.NewBuffer(nil) // Write table headers. writeLine(filterChains, "*filter") writeLine(natChains, "*nat") // Make sure we keep stats for the top-level chains, if they existed // (which most should have because we created them above). if chain, ok := existingFilterChains[kubeServicesChain]; ok { writeLine(filterChains, chain) } else { writeLine(filterChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeServicesChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(kubeServicesChain)) } if chain, ok := existingNATChains[kubeNodePortsChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(kubeNodePortsChain)) } if chain, ok := existingNATChains[kubePostroutingChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(kubePostroutingChain)) } if chain, ok := existingNATChains[KubeMarkMasqChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(KubeMarkMasqChain)) } // Install the kubernetes-specific postrouting rules. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. writeLine(natRules, []string{ "-A", string(kubePostroutingChain), "-m", "comment", "--comment", `"kubernetes service traffic requiring SNAT"`, "-m", "mark", "--mark", proxier.masqueradeMark, "-j", "MASQUERADE", }...) // Install the kubernetes-specific masquerade mark rule. We use a whole chain for // this so that it is easier to flush and change, for example if the mark // value should ever change. writeLine(natRules, []string{ "-A", string(KubeMarkMasqChain), "-j", "MARK", "--set-xmark", proxier.masqueradeMark, }...) // Accumulate NAT chains to keep. activeNATChains := map[utiliptables.Chain]bool{} // use a map as a set // Accumulate the set of local ports that we will be holding open once this update is complete replacementPortsMap := map[localPort]closeable{} // Build rules for each service. for svcName, svcInfo := range proxier.serviceMap { protocol := strings.ToLower(string(svcInfo.protocol)) // Create the per-service chain, retaining counters if possible. svcChain := servicePortChainName(svcName, protocol) if chain, ok := existingNATChains[svcChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(svcChain)) } activeNATChains[svcChain] = true svcXlbChain := serviceLBChainName(svcName, protocol) if svcInfo.onlyNodeLocalEndpoints { // Only for services with the externalTraffic annotation set to OnlyLocal // create the per-service LB chain, retaining counters if possible. if lbChain, ok := existingNATChains[svcXlbChain]; ok { writeLine(natChains, lbChain) } else { writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain)) } activeNATChains[svcXlbChain] = true } else if activeNATChains[svcXlbChain] { // Cleanup the previously created XLB chain for this service delete(activeNATChains, svcXlbChain) } // Capture the clusterIP. args := []string{ "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s cluster IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", fmt.Sprintf("%d", svcInfo.port), } if proxier.masqueradeAll { writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) } if len(proxier.clusterCIDR) > 0 { writeLine(natRules, append(args, "! -s", proxier.clusterCIDR, "-j", string(KubeMarkMasqChain))...) } writeLine(natRules, append(args, "-j", string(svcChain))...) // Capture externalIPs. for _, externalIP := range svcInfo.externalIPs { // If the "external" IP happens to be an IP that is local to this // machine, hold the local port open so no other process can open it // (because the socket might open but it would never work). if local, err := isLocalIP(externalIP); err != nil { glog.Errorf("can't determine if IP is local, assuming not: %v", err) } else if local { lp := localPort{ desc: "externalIP for " + svcName.String(), ip: externalIP, port: svcInfo.port, protocol: protocol, } if proxier.portsMap[lp] != nil { glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { glog.Errorf("can't open %s, skipping this externalIP: %v", lp.String(), err) continue } replacementPortsMap[lp] = socket } } // We're holding the port, so it's OK to install iptables rules. args := []string{ "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s external IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", externalIP), "--dport", fmt.Sprintf("%d", svcInfo.port), } // We have to SNAT packets to external IPs. writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Allow traffic for external IPs that does not come from a bridge (i.e. not from a container) // nor from a local process to be forwarded to the service. // This rule roughly translates to "all traffic from off-machine". // This is imperfect in the face of network plugins that might not use a bridge, but we can revisit that later. externalTrafficOnlyArgs := append(args, "-m", "physdev", "!", "--physdev-is-in", "-m", "addrtype", "!", "--src-type", "LOCAL") writeLine(natRules, append(externalTrafficOnlyArgs, "-j", string(svcChain))...) dstLocalOnlyArgs := append(args, "-m", "addrtype", "--dst-type", "LOCAL") // Allow traffic bound for external IPs that happen to be recognized as local IPs to stay local. // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) } // Capture load-balancer ingress. for _, ingress := range svcInfo.loadBalancerStatus.Ingress { if ingress.IP != "" { // create service firewall chain fwChain := serviceFirewallChainName(svcName, protocol) if chain, ok := existingNATChains[fwChain]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(fwChain)) } activeNATChains[fwChain] = true // The service firewall rules are created based on ServiceSpec.loadBalancerSourceRanges field. // This currently works for loadbalancers that preserves source ips. // For loadbalancers which direct traffic to service NodePort, the firewall rules will not apply. args := []string{ "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", ingress.IP), "--dport", fmt.Sprintf("%d", svcInfo.port), } // jump to service firewall chain writeLine(natRules, append(args, "-j", string(fwChain))...) args = []string{ "-A", string(fwChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s loadbalancer IP"`, svcName.String()), } // Each source match rule in the FW chain may jump to either the SVC or the XLB chain chosenChain := svcXlbChain // If we are proxying globally, we need to masquerade in case we cross nodes. // If we are proxying only locally, we can retain the source IP. if !svcInfo.onlyNodeLocalEndpoints { writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) chosenChain = svcChain } if len(svcInfo.loadBalancerSourceRanges) == 0 { // allow all sources, so jump directly to the KUBE-SVC or KUBE-XLB chain writeLine(natRules, append(args, "-j", string(chosenChain))...) } else { // firewall filter based on each source range allowFromNode := false for _, src := range svcInfo.loadBalancerSourceRanges { writeLine(natRules, append(args, "-s", src, "-j", string(chosenChain))...) // ignore error because it has been validated _, cidr, _ := net.ParseCIDR(src) if cidr.Contains(proxier.nodeIP) { allowFromNode = true } } // generally, ip route rule was added to intercept request to loadbalancer vip from the // loadbalancer's backend hosts. In this case, request will not hit the loadbalancer but loop back directly. // Need to add the following rule to allow request on host. if allowFromNode { writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", ingress.IP), "-j", string(chosenChain))...) } } // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP writeLine(natRules, append(args, "-j", string(KubeMarkDropChain))...) } } // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but // with just 2 rules it ends up being a waste and a cognitive burden. if svcInfo.nodePort != 0 { // Hold the local port open so no other process can open it // (because the socket might open but it would never work). lp := localPort{ desc: "nodePort for " + svcName.String(), ip: "", port: svcInfo.nodePort, protocol: protocol, } if proxier.portsMap[lp] != nil { glog.V(4).Infof("Port %s was open before and is still needed", lp.String()) replacementPortsMap[lp] = proxier.portsMap[lp] } else { socket, err := proxier.portMapper.OpenLocalPort(&lp) if err != nil { glog.Errorf("can't open %s, skipping this nodePort: %v", lp.String(), err) continue } if lp.protocol == "udp" { proxier.clearUdpConntrackForPort(lp.port) } replacementPortsMap[lp] = socket } // We're holding the port, so it's OK to install iptables rules. args := []string{ "-A", string(kubeNodePortsChain), "-m", "comment", "--comment", svcName.String(), "-m", protocol, "-p", protocol, "--dport", fmt.Sprintf("%d", svcInfo.nodePort), } if !svcInfo.onlyNodeLocalEndpoints { // Nodeports need SNAT, unless they're local. writeLine(natRules, append(args, "-j", string(KubeMarkMasqChain))...) // Jump to the service chain. writeLine(natRules, append(args, "-j", string(svcChain))...) } else { // TODO: Make all nodePorts jump to the firewall chain. // Currently we only create it for loadbalancers (#33586). writeLine(natRules, append(args, "-j", string(svcXlbChain))...) } } // If the service has no endpoints then reject packets. if len(proxier.endpointsMap[svcName]) == 0 { writeLine(filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcName.String()), "-m", protocol, "-p", protocol, "-d", fmt.Sprintf("%s/32", svcInfo.clusterIP.String()), "--dport", fmt.Sprintf("%d", svcInfo.port), "-j", "REJECT", ) continue } // Generate the per-endpoint chains. We do this in multiple passes so we // can group rules together. // These two slices parallel each other - keep in sync endpoints := make([]*endpointsInfo, 0) endpointChains := make([]utiliptables.Chain, 0) for _, ep := range proxier.endpointsMap[svcName] { endpoints = append(endpoints, ep) endpointChain := servicePortEndpointChainName(svcName, protocol, ep.ip) endpointChains = append(endpointChains, endpointChain) // Create the endpoint chain, retaining counters if possible. if chain, ok := existingNATChains[utiliptables.Chain(endpointChain)]; ok { writeLine(natChains, chain) } else { writeLine(natChains, utiliptables.MakeChainLine(endpointChain)) } activeNATChains[endpointChain] = true } // First write session affinity rules, if applicable. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { for _, endpointChain := range endpointChains { writeLine(natRules, "-A", string(svcChain), "-m", "comment", "--comment", svcName.String(), "-m", "recent", "--name", string(endpointChain), "--rcheck", "--seconds", fmt.Sprintf("%d", svcInfo.stickyMaxAgeMinutes*60), "--reap", "-j", string(endpointChain)) } } // Now write loadbalancing & DNAT rules. n := len(endpointChains) for i, endpointChain := range endpointChains { // Balancing rules in the per-service chain. args := []string{ "-A", string(svcChain), "-m", "comment", "--comment", svcName.String(), } if i < (n - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", "--probability", fmt.Sprintf("%0.5f", 1.0/float64(n-i))) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) writeLine(natRules, args...) // Rules in the per-endpoint chain. args = []string{ "-A", string(endpointChain), "-m", "comment", "--comment", svcName.String(), } // Handle traffic that loops back to the originator with SNAT. writeLine(natRules, append(args, "-s", fmt.Sprintf("%s/32", strings.Split(endpoints[i].ip, ":")[0]), "-j", string(KubeMarkMasqChain))...) // Update client-affinity lists. if svcInfo.sessionAffinityType == api.ServiceAffinityClientIP { args = append(args, "-m", "recent", "--name", string(endpointChain), "--set") } // DNAT to final destination. args = append(args, "-m", protocol, "-p", protocol, "-j", "DNAT", "--to-destination", endpoints[i].ip) writeLine(natRules, args...) } // The logic below this applies only if this service is marked as OnlyLocal if !svcInfo.onlyNodeLocalEndpoints { continue } // Now write ingress loadbalancing & DNAT rules only for services that have a localOnly annotation // TODO - This logic may be combinable with the block above that creates the svc balancer chain localEndpoints := make([]*endpointsInfo, 0) localEndpointChains := make([]utiliptables.Chain, 0) for i := range endpointChains { if endpoints[i].localEndpoint { // These slices parallel each other; must be kept in sync localEndpoints = append(localEndpoints, endpoints[i]) localEndpointChains = append(localEndpointChains, endpointChains[i]) } } // First rule in the chain redirects all pod -> external vip traffic to the // Service's ClusterIP instead. This happens whether or not we have local // endpoints; only if clusterCIDR is specified if len(proxier.clusterCIDR) > 0 { args = []string{ "-A", string(svcXlbChain), "-m", "comment", "--comment", fmt.Sprintf(`"Redirect pods trying to reach external loadbalancer VIP to clusterIP"`), "-s", proxier.clusterCIDR, "-j", string(svcChain), } writeLine(natRules, args...) } numLocalEndpoints := len(localEndpointChains) if numLocalEndpoints == 0 { // Blackhole all traffic since there are no local endpoints args := []string{ "-A", string(svcXlbChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no local endpoints"`, svcName.String()), "-j", string(KubeMarkDropChain), } writeLine(natRules, args...) } else { // Setup probability filter rules only over local endpoints for i, endpointChain := range localEndpointChains { // Balancing rules in the per-service chain. args := []string{ "-A", string(svcXlbChain), "-m", "comment", "--comment", fmt.Sprintf(`"Balancing rule %d for %s"`, i, svcName.String()), } if i < (numLocalEndpoints - 1) { // Each rule is a probabilistic match. args = append(args, "-m", "statistic", "--mode", "random", "--probability", fmt.Sprintf("%0.5f", 1.0/float64(numLocalEndpoints-i))) } // The final (or only if n == 1) rule is a guaranteed match. args = append(args, "-j", string(endpointChain)) writeLine(natRules, args...) } } } // Delete chains no longer in use. for chain := range existingNATChains { if !activeNATChains[chain] { chainString := string(chain) if !strings.HasPrefix(chainString, "KUBE-SVC-") && !strings.HasPrefix(chainString, "KUBE-SEP-") && !strings.HasPrefix(chainString, "KUBE-FW-") && !strings.HasPrefix(chainString, "KUBE-XLB-") { // Ignore chains that aren't ours. continue } // We must (as per iptables) write a chain-line for it, which has // the nice effect of flushing the chain. Then we can remove the // chain. writeLine(natChains, existingNATChains[chain]) writeLine(natRules, "-X", chainString) } } // Finally, tail-call to the nodeports chain. This needs to be after all // other service portal rules. writeLine(natRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", `"kubernetes service nodeports; NOTE: this must be the last rule in this chain"`, "-m", "addrtype", "--dst-type", "LOCAL", "-j", string(kubeNodePortsChain)) // Write the end-of-table markers. writeLine(filterRules, "COMMIT") writeLine(natRules, "COMMIT") // Sync rules. // NOTE: NoFlushTables is used so we don't flush non-kubernetes chains in the table. filterLines := append(filterChains.Bytes(), filterRules.Bytes()...) natLines := append(natChains.Bytes(), natRules.Bytes()...) lines := append(filterLines, natLines...) glog.V(3).Infof("Restoring iptables rules: %s", lines) err = proxier.iptables.RestoreAll(lines, utiliptables.NoFlushTables, utiliptables.RestoreCounters) if err != nil { glog.Errorf("Failed to execute iptables-restore: %v\nRules:\n%s", err, lines) // Revert new local ports. revertPorts(replacementPortsMap, proxier.portsMap) return } // Close old local ports and save new ones. for k, v := range proxier.portsMap { if replacementPortsMap[k] == nil { v.Close() } } proxier.portsMap = replacementPortsMap }
感谢各位的阅读,以上就是“kube-proxy怎么使用”的内容了,经过本文的学习后,相信大家对kube-proxy怎么使用这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是创新互联,小编将为大家推送更多相关知识点的文章,欢迎关注!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流