CNI(the Container Network Interface),用来抽象容器网络接口,更好的拆分网络功能和接入网络方案

CNI调用

CNI的调用是集中在CRI部分,在kubelet调用CRI时,里面有一组grpc接口,其中在createsandbox时,进行CNI调用

所以更换CRI可能会对CNI调用有影响

CRI对CNI的调用是通过可执行文件调用并传参执行,所以CNI一定是编译成可执行文件后放在指定目录(可配置)下才能正常调用

CNI组成

CNI分为main plugin和IPAM两部分,main plugin负责网络创建的主要逻辑,IPAM则主要是IP分配相关

main plugin

目前官方提供了很多plugin,具体参考plugin,以及一些开源的,如我们使用的sriov和经典的flannelcalico等cni插件

IPAM

官方同样提供了多个IPAM选择,参考IPAM, ipam的实现比较简单,就是按照传入的参数,分配IP,保证IP分配正确不重合即可,可以使用分布式存储来保存状态,甚至可以使用本地文件保存

sriov实现

我们以sriovodin-ipam来分析如何编写cni插件

main实现

入口函数

1
2
3
func main() {
skel.PluginMain(cmdAdd, cmdDel, version.Legacy)
}

也就是我们需要实现cmdAdd,cmdDel两个函数,新版的cni还需要实现cmdGet,不过不经常使用
cmdAdd就是在新建容器过程中调用,它的流程逻辑是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
func cmdAdd(args *skel.CmdArgs) error {

// 1.解析CNI配置
n, err := LoadConf(args.StdinData, args.Args)
......

// 获取新建容器创建的network namespace
netns, err := ns.GetNS(args.Netns)
......

// 2.根据配置分别进行网卡设置
// 这里跟sriov特性有关,是只使用PF,还是创建VF虚拟化来用
if n.Net.PFOnly != true {
if err = setupVF(n, args.IfName, netns); err != nil {
return err
}
} else {
if err = setupPF(n, args.IfName, netns); err != nil {
return err
}
}

// 3. 调用IPAM分配ip,就是调用下面ipam的cmdAdd
result, err := ipam.ExecAdd(n.Net.IPAM.Type, args.StdinData)
if err != nil {
return fmt.Errorf("IPAM plugin returned err: %s", err)
}
if result.IP4 == nil {
return errors.New("IPAM plugin returned missing IPv4 config")
}

// 4.把分配的ip配置在设置好的容器内网卡上
err = netns.Do(func(_ ns.NetNS) error {

if err := ipam.ConfigureIface(args.IfName, result); err != nil {
return err
}

contVeth, err := net.InterfaceByName(args.IfName)
if err != nil {
return err
}

logrus.Infof("arping for ip:%+v with ifveth:%+v", result.IP4.IP.IP, *contVeth)

return arping.GratuitousArpOverIface(result.IP4.IP.IP, *contVeth)
})
if err != nil {
return err
}

result.DNS = n.Net.DNS
// 5 打印结果,让cri使用
return result.Print()
}

可以看到main 部分主要实现的功能就是配置解析和容器内网卡设置,这里和要实现网络紧密相关。

cmdDel则是在容器被销毁中调用,用来清理网络设置和ip

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
func cmdDel(args *skel.CmdArgs) error {
......
// 1.解析CNI配置
n, err := LoadConf(args.StdinData, args.Args)
......
// 获取新建容器创建的network namespace
netns, err := ns.GetNS(args.Netns)
if err != nil {

_, ok := err.(ns.NSPathNotExistErr)
if ok {
return nil
}

return fmt.Errorf("failed to open netns %q: %v", netns, err)
}
defer netns.Close()

// 2. 根据配置分别进行网卡设置释放
if n.Net.PFOnly != true {
if err = releaseVF(n, args.IfName, netns); err != nil {
return err
}
} else {
if err = releasePF(n, args.IfName, netns); err != nil {
return err
}
}

// 3. 调用IPAM释放ip,就是调用下面ipam的cmdDel
err = ipam.ExecDel(n.Net.IPAM.Type, args.StdinData)
if err != nil {
return err
}

return nil
}

可以看到基本逻辑与cmdAdd类似,只是由创建改为释放网络,并调用ipam释放ip

IPAM 编写

入口函数

1
2
3
func main() {
skel.PluginMain(cmdAdd, cmdGet, cmdDel, version.All, about)
}

也就是我们需要实现cmdAdd,cmdDel,cmdGet三个函数,
cmdAdd就是用来生成一个新ip,它的流程逻辑是

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
func cmdAdd(args *skel.CmdArgs) error {
// 1. 解析cni中ipam部分的配置
ipamConf, _, err := allocator.LoadIPAMConfig(args.StdinData, args.Args)
if err != nil {
return err
}
......
// 2. 初始化backend,这里我们使用etcd作为ip状态存储
store, err := etcd.NewEtcdStore(ipamConf.EtcdEndpoints, ipamConf.EtcdPrefix, ipamConf.CAFile, ipamConf.CertFile, ipamConf.KeyFile)

store.DoSession()

defer store.Close()
allocs := []*allocator.IPAllocator{}

// Store all requested IPs in a map, so we can easily remove ones we use
// and error if some remain
requestedIPs := map[string]net.IP{} //net.IP cannot be a key

for _, ip := range ipamConf.IPArgs {
requestedIPs[ip.String()] = ip
}

// 3. 下面就是根据各种配置查询并分配新ip,具体逻辑其实不复杂

var r *current.IPConfig
e := allocator.IPAMEnvArgs{}
if args.Args != "" {
err := types.LoadArgs(args.Args, &e)
if err == nil {
if re, err := store.CheckPodName(podName(string(e.K8S_POD_NAME), string(e.K8S_POD_NAMESPACE), ipamConf.IDC, ipamConf.VLANID)); err == nil {
r = re
}
}
}
if r == nil {
for idx, rangeset := range ipamConf.Ranges {
allocator := allocator.NewIPAllocator(&rangeset, store, idx)

// Check to see if there are any custom IPs requested in this range.
var requestedIP net.IP
for k, ip := range requestedIPs {
if rangeset.Contains(ip) {
requestedIP = ip
delete(requestedIPs, k)
break
}
}

ipConf, err := allocator.Get(args.ContainerID, requestedIP)
if err != nil {
// Deallocate all already allocated IPs
for _, alloc := range allocs {
_ = alloc.Release(args.ContainerID)
}
return fmt.Errorf("failed to allocate for range %d: %v", idx, err)
}

allocs = append(allocs, allocator)

result.IPs = append(result.IPs, ipConf)

if err := store.ReservePodName(podName(string(e.K8S_POD_NAME), string(e.K8S_POD_NAMESPACE), ipamConf.IDC, ipamConf.VLANID), ipConf); err != nil {
log.Printf("[odin-ipam] cmd add reserve podname :%s failed: %+v", e.K8S_POD_NAME, err)
}
}
} else {
if store.CheckPodIP(r.Address.IP.String()) != nil {
store.ReleasePodName(podName(string(e.K8S_POD_NAME), string(e.K8S_POD_NAMESPACE), ipamConf.IDC, ipamConf.VLANID))
return fmt.Errorf( "ip %s already taken, may be u need another ip", r.Address.IP.String())
}
result.IPs = append(result.IPs, r)
}

// If an IP was requested that wasn't fulfilled, fail
if len(requestedIPs) != 0 {
for _, alloc := range allocs {
_ = alloc.Release(args.ContainerID)
}
errstr := "failed to allocate all requested IPs:"
for _, ip := range requestedIPs {
errstr = errstr + " " + ip.String()
}
return fmt.Errorf(errstr)
}

result.Routes = ipamConf.Routes
oldResult, err := types020.GetResult(result)
if err != nil {
return err
}

return oldResult.Print()
}

cmdAdd整体也不复杂,主要就是遍历cidr里所有ip并对比存储里ip状态,看哪些ip可以分配,然后就分配一个出来,相应的cmdDel逻辑就更简单了,就是把指定的ip状态删除,这样就可以在后续继续使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func cmdDel(args *skel.CmdArgs) error {
ipamConf, _, err := allocator.LoadIPAMConfig(args.StdinData, args.Args)
if err != nil {
return err
}

store, err := etcd.NewEtcdStore(ipamConf.EtcdEndpoints, ipamConf.EtcdPrefix, ipamConf.CAFile, ipamConf.CertFile, ipamConf.KeyFile)

if err != nil {
return err
}

store.DoSession()
defer store.Close()

// Loop through all ranges, releasing all IPs, even if an error occurs
var errors []string
for idx, rangeset := range ipamConf.Ranges {
ipAllocator := allocator.NewIPAllocator(&rangeset, store, idx)

err := ipAllocator.Release(args.ContainerID)
if err != nil {
errors = append(errors, err.Error())
}
}

if errors != nil {
return fmt.Errorf(strings.Join(errors, ";"))
}
log.Printf("[odin-ipam]cmd del release ContainerID:%s succ", args.ContainerID)
return nil
}