root@ubuntu:~/sample-controller# go mod vendor go: downloading k8s.io/code-generator v0.0.0-20210701054009-d874928e3dc5 go: downloading github.com/evanphx/json-patch v4.11.0+incompatible go: downloading github.com/pkg/errors v0.9.1 go: downloading github.com/go-openapi/jsonreference v0.19.5 go: downloading github.com/go-openapi/swag v0.19.14 go: downloading github.com/go-openapi/jsonpointer v0.19.5 go: downloading github.com/mailru/easyjson v0.7.6 go: downloading github.com/josharian/intern v1.0.0 root@ubuntu:~/sample-controller# bash hack/update-codegen.sh Generating deepcopy funcs Generating clientset for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/generated/clientset Generating listers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/generated/listers Generating informers for samplecontroller:v1alpha1 at k8s.io/sample-controller/pkg/generated/informers root@ubuntu:~/sample-controller#
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get crds NAME CREATED AT bgpconfigurations.crd.projectcalico.org 2021-07-01T09:21:58Z bgppeers.crd.projectcalico.org 2021-07-01T09:21:58Z blockaffinities.crd.projectcalico.org 2021-07-01T09:21:58Z clusterinformations.crd.projectcalico.org 2021-07-01T09:21:58Z commands.bus.volcano.sh 2021-07-05T07:47:45Z felixconfigurations.crd.projectcalico.org 2021-07-01T09:21:58Z foos.samplecontroller.k8s.io 2021-07-06T03:20:25Z globalnetworkpolicies.crd.projectcalico.org 2021-07-01T09:21:58Z globalnetworksets.crd.projectcalico.org 2021-07-01T09:21:58Z hostendpoints.crd.projectcalico.org 2021-07-01T09:21:58Z ipamblocks.crd.projectcalico.org 2021-07-01T09:21:58Z ipamconfigs.crd.projectcalico.org 2021-07-01T09:21:58Z ipamhandles.crd.projectcalico.org 2021-07-01T09:21:58Z ippools.crd.projectcalico.org 2021-07-01T09:21:58Z jobs.batch.volcano.sh 2021-07-05T07:47:45Z kubecontrollersconfigurations.crd.projectcalico.org 2021-07-01T09:21:58Z networkpolicies.crd.projectcalico.org 2021-07-01T09:21:58Z networksets.crd.projectcalico.org 2021-07-01T09:21:58Z podgroups.scheduling.incubator.k8s.io 2021-07-05T06:53:38Z podgroups.scheduling.sigs.dev 2021-07-05T06:53:38Z podgroups.scheduling.volcano.sh 2021-07-05T07:47:45Z queues.scheduling.incubator.k8s.io 2021-07-05T06:53:38Z queues.scheduling.sigs.dev 2021-07-05T06:53:38Z queues.scheduling.volcano.sh 2021-07-05T07:47:45Z root@ubuntu:~/sample-controller/artifacts/examples# ls crd-status-subresource.yaml crd.yaml example-foo.yaml root@ubuntu:~/sample-controller/artifacts/examples# cat crd.yaml apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: foos.samplecontroller.k8s.io # for more information on the below annotation, please see # https://github.com/kubernetes/enhancements/blob/master/keps/sig-api-machinery/2337-k8s.io-group-protection/README.md annotations: "api-approved.kubernetes.io": "unapproved, experimental-only; please get an approval from Kubernetes API reviewers if you're trying to develop a CRD in the *.k8s.io or *.kubernetes.io groups" spec: group: samplecontroller.k8s.io versions: - name: v1alpha1 served: true storage: true schema: # schema used for validation openAPIV3Schema: type: object properties: spec: type: object properties: deploymentName: type: string replicas: type: integer minimum: 1 maximum: 10 status: type: object properties: availableReplicas: type: integer names: kind: Foo plural: foos scope: Namespaced root@ubuntu:~/sample-controller/artifacts/examples# kubectl get crds | grep foos foos.samplecontroller.k8s.io 2021-07-06T03:20:25Z
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get crds | grep foos foos.samplecontroller.k8s.io 2021-07-06T03:20:25Z root@ubuntu:~/sample-controller/artifacts/examples# kubectl create -f example-foo.yaml foo.samplecontroller.k8s.io/example-foo created root@ubuntu:~/sample-controller/artifacts/examples# cat example-foo.yaml apiVersion: samplecontroller.k8s.io/v1alpha1 kind: Foo metadata: name: example-foo spec: deploymentName: example-foo replicas: 1 root@ubuntu:~/sample-controller/artifacts/examples# kubectl get Foo NAME AGE example-foo 2m2s root@ubuntu:~/sample-controller/artifacts/examples# kubectl describe example-foo error: the server doesn't have a resource type "example-foo" root@ubuntu:~/sample-controller/artifacts/examples# kubectl describe Foo Name: example-foo Namespace: default Labels: <none> Annotations: <none> API Version: samplecontroller.k8s.io/v1alpha1 Kind: Foo Metadata: Creation Timestamp: 2021-07-06T03:23:49Z Generation: 1 Managed Fields: API Version: samplecontroller.k8s.io/v1alpha1 Fields Type: FieldsV1 fieldsV1: f:spec: .: f:deploymentName: f:replicas: Manager: kubectl Operation: Update Time: 2021-07-06T03:23:49Z Resource Version: 1111619 Self Link: /apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/example-foo UID: b91554b5-58fa-4b08-a011-edaf992b60de Spec: Deployment Name: example-foo Replicas: 1 Events: <none> root@ubuntu:~/sample-controller/artifacts/examples# cat example-foo.yaml apiVersion: samplecontroller.k8s.io/v1alpha1 kind: Foo metadata: name: example-foo spec: deploymentName: example-foo replicas: 1 root@ubuntu:~/sample-controller/artifacts/examples#
运行
./sample-controller -kubeconfig=$HOME/.kube/config
之前
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments No resources found in default namespace. root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments No resources found in default namespace.
运行
./sample-controller -kubeconfig=$HOME/.kube/config
root@ubuntu:~/sample-controller# ./sample-controller -kubeconfig=$HOME/.kube/config I0706 11:36:14.120260 61579 controller.go:115] Setting up event handlers I0706 11:36:14.120399 61579 controller.go:156] Starting Foo controller I0706 11:36:14.120412 61579 controller.go:159] Waiting for informer caches to sync I0706 11:36:14.220565 61579 controller.go:164] Starting workers I0706 11:36:14.220596 61579 controller.go:170] Started workers I0706 11:36:14.246385 61579 controller.go:228] Successfully synced 'default/example-foo' I0706 11:36:14.246489 61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" I0706 11:36:14.250982 61579 controller.go:228] Successfully synced 'default/example-foo' I0706 11:36:14.251023 61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" I0706 11:36:14.256740 61579 controller.go:228] Successfully synced 'default/example-foo' I0706 11:36:14.256772 61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" I0706 11:36:14.276593 61579 controller.go:228] Successfully synced 'default/example-foo' I0706 11:36:14.276643 61579 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully"
运行
./sample-controller -kubeconfig=$HOME/.kube/config
之后
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments NAME READY UP-TO-DATE AVAILABLE AGE example-foo 0/1 1 0 5s root@ubuntu:~/sample-controller/artifacts/examples#
io port
root@ubuntu:~# netstat -pan | grep sample tcp 0 0 10.10.16.82:34178 10.10.16.249:6443 ESTABLISHED 56698/./sample-cont root@ubuntu:~# ip a sh | grep 10.10.16.249 root@ubuntu:~# telnet 10.10.16.249 6443 Trying 10.10.16.249... Connected to 10.10.16.249. Escape character is '^]'. ^CConnection closed by foreign host. root@ubuntu:~# sp -elf | grep 56698 sp: command not found root@ubuntu:~# ps -elf | grep 56698 0 S root 56698 54998 0 80 0 - 517136 futex_ 11:49 pts/0 00:00:00 ./sample-controller -kubeconfig=/root/.kube/config 0 S root 60412 56743 0 80 0 - 1096 pipe_w 11:53 pts/1 00:00:00 grep 56698 root@ubuntu:~#
10.10.16.249是keepavlived的vip
[root@centos7 ~]# ip a sh enp125s0f0 2: enp125s0f0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc pfifo_fast state UP group default qlen 1000 link/ether b0:08:75:5f:b8:5b brd ff:ff:ff:ff:ff:ff inet 10.10.16.251/24 scope global enp125s0f0 valid_lft forever preferred_lft forever inet 10.10.16.249/32 scope global enp125s0f0 valid_lft forever preferred_lft forever [root@centos7 ~]# netstat -pan | grep 6449 [root@centos7 ~]# netstat -pan | grep 6443 tcp 0 0 10.10.16.249:54830 10.10.16.249:6443 ESTABLISHED 112750/kube-control tcp 0 0 10.10.16.249:41322 10.10.16.249:6443 ESTABLISHED 112750/kube-control tcp 0 0 10.10.16.249:41328 10.10.16.249:6443 ESTABLISHED 112872/kube-schedul tcp 0 1 10.10.16.251:48116 10.10.18.46:6443 SYN_SENT 25931/haproxy tcp 0 0 10.10.16.249:38122 10.10.16.249:6443 ESTABLISHED 20496/kubelet tcp 0 0 10.10.16.249:58900 10.10.16.249:6443 ESTABLISHED 21108/kube-proxy tcp 0 0 10.10.16.249:41330 10.10.16.249:6443 ESTABLISHED 112872/kube-schedul tcp6 0 0 :::6443 :::* LISTEN 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.81:17642 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.82:34178 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.82:46763 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.249:54830 ESTABLISHED 21302/kube-apiserve tcp6 0 0 ::1:43226 ::1:6443 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.251:57395 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.81:3112 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.82:45391 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.251:56994 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.82:43286 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.244.129.129:50662 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.47:59912 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.82:52248 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.47:59914 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.249:41328 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.82:38136 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.82:50596 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.81:30491 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.249:41322 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.82:44579 ESTABLISHED 21302/kube-apiserve tcp6 0 0 ::1:6443 ::1:43226 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.249:38122 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.81:40659 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.82:39354 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.249:41330 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.47:46764 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.81:60932 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.82:43290 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.47:41240 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.81:46054 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.47:40538 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.249:58900 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.251:6443 10.10.16.81:61069 ESTABLISHED 21302/kube-apiserve tcp6 0 0 10.10.16.249:6443 10.10.16.47:33230 ESTABLISHED 21302/kube-apiserve [root@centos7 ~]#
也就是sample-controller不直接访问etcd,通过apiserver访问
root@ubuntu:~/sample-controller/artifacts/examples# kubectl get deployments NAME READY UP-TO-DATE AVAILABLE AGE example-foo 0/1 1 0 5s root@ubuntu:~/sample-controller/artifacts/examples# kubectl get pods NAME READY STATUS RESTARTS AGE example-foo-54dc4db9fc-9t7br 1/1 Running 0 6m23s test-job-default-nginx-0 1/1 Running 0 18h test-job-default-nginx-1 1/1 Running 0 18h test-job-default-nginx-2 1/1 Running 0 18h test-job-default-nginx-3 1/1 Running 0 18h test-job-default-nginx-4 1/1 Running 0 18h test-job-default-nginx-5 1/1 Running 0 18h root@ubuntu:~/sample-controller/artifacts/examples# kubectl describe pods example-foo-54dc4db9fc-9t7br Name: example-foo-54dc4db9fc-9t7br Namespace: default Priority: 0 Node: bogon/10.10.16.81 , controller 运行在ubuntu Start Time: Tue, 06 Jul 2021 11:36:14 +0800 Labels: app=nginx controller=example-foo pod-template-hash=54dc4db9fc Annotations: cni.projectcalico.org/podIP: 10.244.29.17/32 cni.projectcalico.org/podIPs: 10.244.29.17/32 Status: Running IP: 10.244.29.17 IPs: IP: 10.244.29.17 Controlled By: ReplicaSet/example-foo-54dc4db9fc Containers: nginx: Container ID: docker://83a4cf68e72bea51805126a8b1f4d2760aae38228a9123f2fffff730508fffce Image: nginx:latest Image ID: docker-pullable://nginx@sha256:47ae43cdfc7064d28800bc42e79a429540c7c80168e8c8952778c0d5af1c09db Port: <none> Host Port: <none> State: Running Started: Tue, 06 Jul 2021 11:37:31 +0800 Ready: True Restart Count: 0 Environment: <none> Mounts: /var/run/secrets/kubernetes.io/serviceaccount from default-token-cfr6q (ro) Conditions: Type Status Initialized True Ready True ContainersReady True PodScheduled True Volumes: default-token-cfr6q: Type: Secret (a volume populated by a Secret) SecretName: default-token-cfr6q Optional: false QoS Class: BestEffort Node-Selectors: <none> Tolerations: node.kubernetes.io/not-ready:NoExecute for 300s node.kubernetes.io/unreachable:NoExecute for 300s Events: Type Reason Age From Message ---- ------ ---- ---- ------- Normal Scheduled <unknown> default-scheduler Successfully assigned default/example-foo-54dc4db9fc-9t7br to bogon Normal Pulling 6m45s kubelet, bogon Pulling image "nginx:latest" Normal Pulled 5m31s kubelet, bogon Successfully pulled image "nginx:latest" Normal Created 5m30s kubelet, bogon Created container nginx Normal Started 5m30s kubelet, bogon Started container nginx root@ubuntu:~/sample-controller/artifacts/examples#
nginx
// newDeployment creates a new Deployment for a Foo resource. It also sets // the appropriate OwnerReferences on the resource so handleObject can discover // the Foo resource that 'owns' it. func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment { labels := map[string]string{ "app": "nginx", "controller": foo.Name, } return &appsv1.Deployment{ ObjectMeta: metav1.ObjectMeta{ Name: foo.Spec.DeploymentName, Namespace: foo.Namespace, OwnerReferences: []metav1.OwnerReference{ *metav1.NewControllerRef(foo, samplev1alpha1.SchemeGroupVersion.WithKind("Foo")), }, }, Spec: appsv1.DeploymentSpec{ Replicas: foo.Spec.Replicas, Selector: &metav1.LabelSelector{ MatchLabels: labels, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: labels, }, Spec: corev1.PodSpec{ Containers: []corev1.Container{ { Name: "nginx", Image: "nginx:latest", }, }, }, }, }, } }
root@ubuntu:/opt/gopath/src/github.com/go-delve/delve# ps -elf | grep sample 0 S root 20518 11022 0 80 0 - 1096 pipe_w 11:59 pts/2 00:00:00 grep sample 0 S root 61579 29030 0 80 0 - 590852 futex_ 11:36 pts/0 00:00:00 ./sample-controller -kubeconfig=/root/.kube/config root@ubuntu:/opt/gopath/src/github.com/go-delve/delve# dlv attach 61579 Type 'help' for list of commands. (dlv) b newDeployment Breakpoint 1 (enabled) set at 0xeea018 for main.newDeployment() /root/sample-controller/controller.go:391 (dlv) r Command failed: cannot restart process Delve did not create (dlv) c > main.newDeployment() /root/sample-controller/controller.go:391 (hits goroutine(135):1 total:1) (PC: 0xeea018) Warning: debugging optimized function 386: } 387: 388: // newDeployment creates a new Deployment for a Foo resource. It also sets 389: // the appropriate OwnerReferences on the resource so handleObject can discover 390: // the Foo resource that 'owns' it. => 391: func newDeployment(foo *samplev1alpha1.Foo) *appsv1.Deployment { 392: labels := map[string]string{ 393: "app": "nginx", 394: "controller": foo.Name, 395: } 396: return &appsv1.Deployment{ (dlv) bt 0 0x0000000000eea018 in main.newDeployment at /root/sample-controller/controller.go:391 1 0x0000000000ee9580 in main.(*Controller).syncHandler at /root/sample-controller/controller.go:277 2 0x0000000000eeae38 in main.(*Controller).processNextWorkItem.func1 at /root/sample-controller/controller.go:220 3 0x0000000000ee8e70 in main.(*Controller).processNextWorkItem at /root/sample-controller/controller.go:230 4 0x0000000000ee8e00 in main.(*Controller).runWorker at /root/sample-controller/controller.go:181 5 0x0000000000eeb260 in main.(*Controller).runWorker-fm at /root/sample-controller/controller.go:180 6 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 7 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 8 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 9 0x00000000006b7d80 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 10 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) quit
controller.Run(2, stopCh)
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { defer utilruntime.HandleCrash() defer c.workqueue.ShutDown() // Start the informer factories to begin populating the informer caches klog.Info("Starting Foo controller") // Wait for the caches to be synced before starting workers klog.Info("Waiting for informer caches to sync") if ok := cache.WaitForCacheSync(stopCh, c.deploymentsSynced, c.foosSynced); !ok { return fmt.Errorf("failed to wait for caches to sync") } klog.Info("Starting workers") // Launch two workers to process Foo resources for i := 0; i < threadiness; i++ { go wait.Until(c.runWorker, time.Second, stopCh) } klog.Info("Started workers") <-stopCh klog.Info("Shutting down workers") return nil } // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. func (c *Controller) runWorker() { for c.processNextWorkItem() { } }
cache.SharedIndexInformer
NewInformerFunc
pkg/generated/informers/externalversions/internalinterfaces/factory_interfaces.go
// NewInformerFunc takes versioned.Interface and time.Duration to return a SharedIndexInformer. type NewInformerFunc func(versioned.Interface, time.Duration) cache.SharedIndexInformer // SharedInformerFactory a small interface to allow for adding an informer without an import cycle type SharedInformerFactory interface { Start(stopCh <-chan struct{}) InformerFor(obj runtime.Object, newFunc NewInformerFunc) cache.SharedIndexInformer }
pkg/generated/informers/externalversions/factory.go
// SharedInformerFactory provides shared informers for resources in all known // API group versions. type SharedInformerFactory interface { internalinterfaces.SharedInformerFactory ForResource(resource schema.GroupVersionResource) (GenericInformer, error) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool Samplecontroller() samplecontroller.Interface }
kubebatch 有多个informer对象
client/informers/externalversions/scheduling/v1alpha2/queue.go:37: Informer() cache.SharedIndexInformer client/informers/externalversions/scheduling/v1alpha2/queue.go:49:func NewQueueInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { client/informers/externalversions/scheduling/v1alpha2/queue.go:56:func NewFilteredQueueInformer(client versioned.Interface, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { client/informers/externalversions/scheduling/v1alpha2/queue.go:78:func (f *queueInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { client/informers/externalversions/scheduling/v1alpha2/queue.go:82:func (f *queueInformer) Informer() cache.SharedIndexInformer { client/informers/externalversions/scheduling/v1alpha2/podgroup.go:37: Informer() cache.SharedIndexInformer client/informers/externalversions/scheduling/v1alpha2/podgroup.go:50:func NewPodGroupInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { client/informers/externalversions/scheduling/v1alpha2/podgroup.go:57:func NewFilteredPodGroupInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { client/informers/externalversions/scheduling/v1alpha2/podgroup.go:79:func (f *podGroupInformer) defaultInformer(client versioned.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { client/informers/externalversions/scheduling/v1alpha2/podgroup.go:83:func (f *podGroupInformer) Informer() cache.SharedIndexInformer {
NewSharedInformerFactory
import ( "flag" "time" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog/v2" // Uncomment the following line to load the gcp plugin (only required to authenticate against GKE clusters). // _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" clientset "k8s.io/sample-controller/pkg/generated/clientset/versioned" informers "k8s.io/sample-controller/pkg/generated/informers/externalversions" "k8s.io/sample-controller/pkg/signals" )
func main() { klog.InitFlags(nil) flag.Parse() // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig) if err != nil { klog.Fatalf("Error building kubeconfig: %s", err.Error()) } kubeClient, err := kubernetes.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building kubernetes clientset: %s", err.Error()) } exampleClient, err := clientset.NewForConfig(cfg) if err != nil { klog.Fatalf("Error building example clientset: %s", err.Error()) } kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30) exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30) controller := NewController(kubeClient, exampleClient, kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos()) // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh) // Start method is non-blocking and runs all registered informers in a dedicated goroutine. kubeInformerFactory.Start(stopCh) exampleInformerFactory.Start(stopCh) if err = controller.Run(2, stopCh); err != nil { klog.Fatalf("Error running controller: %s", err.Error()) } }
root@ubuntu:~/sample-controller# ls pkg/generated/informers/externalversions/
factory.go generic.go internalinterfaces samplecontroller
root@ubuntu:~/sample-controller#
ls pkg/generated/informers/externalversions/
// NewSharedInformerFactoryWithOptions constructs a new instance of a SharedInformerFactory with additional options. func NewSharedInformerFactoryWithOptions(client versioned.Interface, defaultResync time.Duration, options ...SharedInformerOption) SharedInformerFactory { factory := &sharedInformerFactory{ client: client, namespace: v1.NamespaceAll, defaultResync: defaultResync, informers: make(map[reflect.Type]cache.SharedIndexInformer), startedInformers: make(map[reflect.Type]bool), customResync: make(map[reflect.Type]time.Duration), } // Apply all options for _, opt := range options { factory = opt(factory) } return factory }
package main import ( "k8s.io/client-go/informers" ) func main() { // client 是 kube api server 客户端,因为要从 kube api // server 端拉取数据, resyncpersiod 是重新拉取周期,后面会细说 sharedInformers := informers.NewSharedInformerFactory(client,ResyncPeriod) // 监听 pod 资源 podInformer := sharedInformers.Core().V1().Pods() // 监听 service 资源 servicesInformer := sharedInformers.Core().V1().Services() podLister = podInformer.Lister() servicesLister = servicesInformer.Lister() sListerSynced = sInformer.Informer().HasSynced dc.podListerSynced = podInformer.Informer().HasSynced // 启动各个资源 informer sharedInformers.Start(stopChannel) }
vi pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go
root@ubuntu:~/sample-controller# dlv exec ./sample-controller -- -kubeconfig=$HOME/.kube/config Type 'help' for list of commands. (dlv) b NewFilteredFooInformer Breakpoint 1 (enabled) set at 0xe3be48 for k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.NewFilteredFooInformer() ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58 (dlv) r Process restarted with PID 30188 (dlv) bt 0 0x0000ffff8b6b51c0 in ??? at ?:-1 error: input/output error (truncated) (dlv) r Process restarted with PID 30468 (dlv) b NewFilteredFooInformer Command failed: Breakpoint exists at /root/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58 at e3be48 (dlv) c > k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.NewFilteredFooInformer() ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58 (hits goroutine(1):1 total:1) (PC: 0xe3be48) Warning: debugging optimized function 53: } 54: 55: // NewFilteredFooInformer constructs a new informer for Foo type. 56: // Always prefer using an informer factory to get a shared informer instead of getting an independent 57: // one. This reduces memory footprint and number of connections to the server. => 58: func NewFilteredFooInformer(client versioned.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { 59: return cache.NewSharedIndexInformer( 60: &cache.ListWatch{ 61: ListFunc: func(options v1.ListOptions) (runtime.Object, error) { 62: if tweakListOptions != nil { 63: tweakListOptions(&options) (dlv) bt 0 0x0000000000e3be48 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.NewFilteredFooInformer at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:58 1 0x0000000000e3c744 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).defaultInformer at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:81 2 0x0000000000e3c744 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).defaultInformer-fm at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:80 3 0x0000000000ee6880 in k8s.io/sample-controller/pkg/generated/informers/externalversions.(*sharedInformerFactory).InformerFor at ./pkg/generated/informers/externalversions/factory.go:162 4 0x0000000000e3c100 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).Informer at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:85 5 0x0000000000e3c170 in k8s.io/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1.(*fooInformer).Lister at ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go:89 6 0x0000000000ee8020 in main.NewController at ./controller.go:109 7 0x0000000000eea680 in main.main at ./main.go:65 8 0x0000000000044aa4 in runtime.main at /usr/local/go/src/runtime/proc.go:203 9 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148
(dlv) b foo.go:82 Command failed: Location "foo.go:82" ambiguous: /root/sample-controller/pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go,
/root/sample-controller/pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go,
/root/sample-controller/pkg/generated/listers/samplecontroller/v1alpha1/foo.go… (dlv)
root@ubuntu:~/sample-controller# find ./ -name foo.go ./pkg/generated/informers/externalversions/samplecontroller/v1alpha1/foo.go ./pkg/generated/clientset/versioned/typed/samplecontroller/v1alpha1/foo.go ./pkg/generated/listers/samplecontroller/v1alpha1/foo.go
InformerFor
产生一个samplecontrollerv1alpha1.Foo{}对象 return f.factory.InformerFor(&samplecontrollerv1alpha1.Foo{}, f.defaultInformer)
generated/informers/externalversions/factory.go:147:func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
register.go
注册struct
参考 Accessing Kubernetes CRDs from the client-go package
package v1alpha1 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" samplecontroller "k8s.io/sample-controller/pkg/apis/samplecontroller" ) // SchemeGroupVersion is group version used to register these objects var SchemeGroupVersion = schema.GroupVersion{Group: samplecontroller.GroupName, Version: "v1alpha1"} // Kind takes an unqualified kind and returns back a Group qualified GroupKind func Kind(kind string) schema.GroupKind { return SchemeGroupVersion.WithKind(kind).GroupKind() } // Resource takes an unqualified resource and returns a Group qualified GroupResource func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } var ( // SchemeBuilder initializes a scheme builder SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) // AddToScheme is a global function that registers this API group & version to a scheme AddToScheme = SchemeBuilder.AddToScheme ) // Adds the list of known types to Scheme. func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &Foo{}, &FooList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
kube batch
package v1alpha2 import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" ) var ( SchemeBuilder = runtime.NewSchemeBuilder(addKnownTypes) AddToScheme = SchemeBuilder.AddToScheme ) const ( // GroupName is the group name used in this package. GroupName = "scheduling.sigs.dev" // GroupVersion is the version of scheduling group GroupVersion = "v1alpha2" ) // SchemeGroupVersion is the group version used to register these objects. var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: GroupVersion} // Resource takes an unqualified resource and returns a Group-qualified GroupResource. func Resource(resource string) schema.GroupResource { return SchemeGroupVersion.WithResource(resource).GroupResource() } // addKnownTypes adds the set of types defined in this package to the supplied scheme. func addKnownTypes(scheme *runtime.Scheme) error { scheme.AddKnownTypes(SchemeGroupVersion, &PodGroup{}, &PodGroupList{}, &Queue{}, &QueueList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil }
调用Add delete Update 函数
sc.pdbInformer = informerFactory.Policy().V1beta1().PodDisruptionBudgets() sc.pdbInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPDB, UpdateFunc: sc.UpdatePDB, DeleteFunc: sc.DeletePDB, }) sc.pcInformer = informerFactory.Scheduling().V1beta1().PriorityClasses() sc.pcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPriorityClass, UpdateFunc: sc.UpdatePriorityClass, DeleteFunc: sc.DeletePriorityClass, }) kbinformer := kbinfo.NewSharedInformerFactory(sc.kbclient, 0) // create informer for PodGroup(v1alpha1) information sc.podGroupInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().PodGroups() sc.podGroupInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPodGroupAlpha1, UpdateFunc: sc.UpdatePodGroupAlpha1, DeleteFunc: sc.DeletePodGroupAlpha1, }) // create informer for PodGroup(v1alpha2) information sc.podGroupInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().PodGroups() sc.podGroupInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddPodGroupAlpha2, UpdateFunc: sc.UpdatePodGroupAlpha2, DeleteFunc: sc.DeletePodGroupAlpha2, }) // create informer for Queue(v1alpha1) information sc.queueInformerv1alpha1 = kbinformer.Scheduling().V1alpha1().Queues() sc.queueInformerv1alpha1.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddQueuev1alpha1, UpdateFunc: sc.UpdateQueuev1alpha1, DeleteFunc: sc.DeleteQueuev1alpha1, }) // create informer for Queue(v1alpha2) information sc.queueInformerv1alpha2 = kbinformer.Scheduling().V1alpha2().Queues() sc.queueInformerv1alpha2.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: sc.AddQueuev1alpha2, UpdateFunc: sc.UpdateQueuev1alpha2, DeleteFunc: sc.DeleteQueuev1alpha2, })
1-2 err := r.store.Add(event.Object) cache.(*DeltaFIFO).Add() 3-4-5 从 (*DeltaFIFO).Pop 存入一个map cache.(*threadSafeMap).Add 6-7 c.workqueue.Add 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error => 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return 344: } 345: c.workqueue.Add(key) 346: } 8-9 workqueue.(*Type).Get
DeltaFIFO
type DeltaFIFO struct { ... items map[string]Deltas queue []string ... } type Delta struct { Type DeltaType Object interface{} } type Deltas []Delta type DeltaType string // Change type definition const ( Added DeltaType = "Added" Updated DeltaType = "Updated" Deleted DeltaType = "Deleted" Sync DeltaType = "Sync" )
其中queue存储的是Object的id,而items存储的是以ObjectID为key的这个Object的事件列表,
可以想象到是这样的一个数据结构,左边是Key,右边是一个数组对象,其中每个元素都是由type和obj组成.
DeltaFIFO顾名思义存放Delta数据的先入先出队列,相当于一个数据的中转站,将数据从一个地方转移另一个地方。
watchHandler
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 defer w.Stop() // 关闭watch通道 loop: for { select { case <-stopCh: return errorStopRequested // 收到停止通道的 case err := <-errc: // 错误通道 return err case event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件 if !ok { // 通道被关闭 break loop // 跳出循环 } if event.Type == watch.Error { // 事件类型是ERROR return apierrors.FromObject(event.Object) } if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型 // 这是在判断期待的类型和监听到的事件类型是否一致 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } if r.expectedGVK != nil { // GVK是否一致 if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { // 根据事件类型,对delta队列进行增删改操作 case watch.Added: // 创建事件 err := r.store.Add(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: err := r.store.Delete(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新 default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil }
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 defer w.Stop() // 关闭watch通道 loop: for { select { case <-stopCh: return errorStopRequested // 收到停止通道的 case err := <-errc: // 错误通道 return err case event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件 if !ok { // 通道被关闭 break loop // 跳出循环 } if event.Type == watch.Error { // 事件类型是ERROR return apierrors.FromObject(event.Object) } if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型 // 这是在判断期待的类型和监听到的事件类型是否一致 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } if r.expectedGVK != nil { // GVK是否一致 if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { // 根据事件类型,对delta队列进行增删改操作 case watch.Added: // 创建事件 err := r.store.Add(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: err := r.store.Delete(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新 default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil }
root@ubuntu:~/sample-controller# dlv exec ./sample-controller -- --kubeconfig=$HOME/.kube/config Type 'help' for list of commands. (dlv) b reflector.go:348 Breakpoint 1 (enabled) set at 0xe1e51c for k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 (dlv) c I0709 15:21:03.739440 63308 controller.go:115] Setting up event handlers I0709 15:21:03.739604 63308 controller.go:156] Starting Foo controller I0709 15:21:03.739636 63308 controller.go:159] Waiting for informer caches to sync > k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 (hits goroutine(102):1 total:1) (PC: 0xe1e51c) Warning: debugging optimized function 343: if err != nil { 344: return fmt.Errorf("unable to understand list result %#v: %v", list, err) 345: } 346: resourceVersion = listMetaInterface.GetResourceVersion() 347: initTrace.Step("Resource version extracted") => 348: items, err := meta.ExtractList(list) 349: if err != nil { 350: return fmt.Errorf("unable to understand list result %#v (%v)", list, err) 351: } 352: initTrace.Step("Objects extracted") 353: if err := r.syncWith(items, resourceVersion); err != nil { (dlv) bt 0 0x0000000000e1e51c in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 1 0x0000000000e154a4 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:360 2 0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221 3 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 4 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 5 0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220 6 0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218 7 0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56 8 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 9 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) p list k8s.io/apimachinery/pkg/runtime.Object(*k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooList) *{ TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""}, ListMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ListMeta { SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/foos", ResourceVersion: "1852450", Continue: "", RemainingItemCount: *int64 nil,}, Items: []k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo len: 1, cap: 1, [ (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo")(0x4000436000), ],} (dlv) n > k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:352 (PC: 0xe1e60c) Warning: debugging optimized function 347: initTrace.Step("Resource version extracted") 348: items, err := meta.ExtractList(list) 349: if err != nil { 350: return fmt.Errorf("unable to understand list result %#v (%v)", list, err) 351: } => 352: initTrace.Step("Objects extracted") 353: if err := r.syncWith(items, resourceVersion); err != nil { 354: return fmt.Errorf("unable to sync list result: %v", err) 355: } 356: initTrace.Step("SyncWith done") 357: r.setLastSyncResourceVersion(resourceVersion) (dlv) p items []k8s.io/apimachinery/pkg/runtime.Object len: 1, cap: 1, [ *k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo { TypeMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta")(0x4000436000), ObjectMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta")(0x4000436020), Spec: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec")(0x4000436118), Status: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus")(0x4000436130),}, ] (dlv) p items[0].TypeMeta k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta { Kind: "Foo", APIVersion: "samplecontroller.k8s.io/v1alpha1",} (dlv) p items[0].ObjectMeta k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta { Name: "example-foo", GenerateName: "", Namespace: "default", SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/e...+10 more", UID: "cadf9f8e-4355-453e-b249-4ed92abccc16", ResourceVersion: "1852450", Generation: 5, CreationTimestamp: k8s.io/apimachinery/pkg/apis/meta/v1.Time { Time: (*time.Time)(0x4000436088),}, DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil, DeletionGracePeriodSeconds: *int64 nil, Labels: map[string]string nil, Annotations: map[string]string nil, OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil, Finalizers: []string len: 0, cap: 0, nil, ClusterName: "", ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 2, cap: 2, [ (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x400032e3c0), (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x400032e420), ],} (dlv) p items[0].Spec k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec { DeploymentName: "example-foo", Replicas: *1,} (dlv) p items[0].Status k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus {AvailableReplicas: 1} (dlv)
(dlv) s I0709 15:25:24.036509 63308 controller.go:164] Starting workers I0709 15:25:24.036601 63308 controller.go:170] Started workers > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:464 (PC: 0xe16070) Warning: debugging optimized function 459: func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { 460: eventCount := 0 461: 462: // Stopping the watcher should be idempotent and if we return from this function there's no way 463: // we're coming back in with the same watch interface. => 464: defer w.Stop() 465: 466: loop: 467: for { 468: select { 469: case <-stopCh: (dlv) bt 0 0x0000000000e16070 in k8s.io/client-go/tools/cache.(*Reflector).watchHandler at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:464 1 0x0000000000e15824 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:428 2 0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221 3 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 4 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 5 0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220 6 0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218 7 0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56 8 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 9 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv)
func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { eventCount := 0 defer w.Stop() // 关闭watch通道 loop: for { select { case <-stopCh: return errorStopRequested // 收到停止通道的 case err := <-errc: // 错误通道 return err case event, ok := <-w.ResultChan(): // 从resultChan通道中获取事件 if !ok { // 通道被关闭 break loop // 跳出循环 } if event.Type == watch.Error { // 事件类型是ERROR return apierrors.FromObject(event.Object) } if r.expectedType != nil { // 查看reflector是设置了期望获取的资源类型 // 这是在判断期待的类型和监听到的事件类型是否一致 if e, a := r.expectedType, reflect.TypeOf(event.Object); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected type %v, but watch event object had type %v", r.name, e, a)) continue } } if r.expectedGVK != nil { // GVK是否一致 if e, a := *r.expectedGVK, event.Object.GetObjectKind().GroupVersionKind(); e != a { utilruntime.HandleError(fmt.Errorf("%s: expected gvk %v, but watch event object had gvk %v", r.name, e, a)) continue } } meta, err := meta.Accessor(event.Object) if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) continue } newResourceVersion := meta.GetResourceVersion() switch event.Type { // 根据事件类型,对delta队列进行增删改操作 case watch.Added: // 创建事件 err := r.store.Add(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Modified: err := r.store.Update(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) } case watch.Deleted: err := r.store.Delete(event.Object) // 将该事件放入deltalFIFO if err != nil { utilruntime.HandleError(fmt.Errorf("%s: unable to delete watch event object (%#v) from store: %v", r.name, event.Object, err)) } case watch.Bookmark: // 意思是”表示监听已在此处同步,只需更新 default: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) } *resourceVersion = newResourceVersion r.setLastSyncResourceVersion(newResourceVersion) eventCount++ } } watchDuration := r.clock.Since(start) if watchDuration < 1*time.Second && eventCount == 0 { return fmt.Errorf("very short watch: %s: Unexpected watch close - watch lasted less than a second and no items received", r.name) } klog.V(4).Infof("%s: Watch close - %v total %v items received", r.name, r.expectedTypeName, eventCount) return nil }
watch
申请client request.watch的时候,会生成一个watch对象, 并启动receive
// NewStreamWatcher creates a StreamWatcher from the given decoder. func NewStreamWatcher(d Decoder) *StreamWatcher { sw := &StreamWatcher{ source: d, // It's easy for a consumer to add buffering via an extra // goroutine/channel, but impossible for them to remove it, // so nonbuffered is better. result: make(chan Event), } go sw.receive() return sw }
触发断点
root@ubuntu:~/sample-controller/artifacts/examples# kubectl apply -f example-foo.yaml foo.samplecontroller.k8s.io/example-foo created root@ubuntu:~/sample-controller/artifacts/examples# cd .
(dlv) b reflector.go:500 (dlv) > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(91):2 total:3) (PC: 0xe16408) Warning: debugging optimized function 495: continue 496: } 497: newResourceVersion := meta.GetResourceVersion() 498: switch event.Type { 499: case watch.Added: => 500: err := r.store.Add(event.Object) 501: if err != nil { 502: utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) 503: } 504: case watch.Modified: 505: err := r.store.Update(event.Object) (dlv) bt 0 0x0000000000e16408 in k8s.io/client-go/tools/cache.(*Reflector).watchHandler at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 1 0x0000000000e15824 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:428 2 0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221 3 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 4 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 5 0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220 6 0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218 7 0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56 8 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 9 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) p r.store k8s.io/client-go/tools/cache.Store(*k8s.io/client-go/tools/cache.DeltaFIFO) *{ lock: sync.RWMutex { w: (*sync.Mutex)(0x400053c000), writerSem: 0, readerSem: 0, readerCount: 0, readerWait: 0,}, cond: sync.Cond { noCopy: sync.noCopy {}, L: sync.Locker(*sync.RWMutex) ..., notify: (*sync.notifyList)(0x400053c028), checker: 274883395656,}, items: map[string]k8s.io/client-go/tools/cache.Deltas [], queue: []string len: 0, cap: 2, [], populated: true, initialPopulationCount: 0, keyFunc: k8s.io/client-go/tools/cache.MetaNamespaceKeyFunc, knownObjects: k8s.io/client-go/tools/cache.KeyListerGetter(*k8s.io/client-go/tools/cache.cache) *{ cacheStorage: k8s.io/client-go/tools/cache.ThreadSafeStore(*k8s.io/client-go/tools/cache.threadSafeMap) ..., keyFunc: k8s.io/client-go/tools/cache.DeletionHandlingMetaNamespaceKeyFunc,}, closed: false, emitDeltaTypeReplaced: true,} (dlv) p w k8s.io/apimachinery/pkg/watch.Interface(*k8s.io/apimachinery/pkg/watch.StreamWatcher) *{ Mutex: sync.Mutex {state: 0, sema: 0}, source: k8s.io/apimachinery/pkg/watch.Decoder(*k8s.io/client-go/rest/watch.Decoder) *{ decoder: k8s.io/apimachinery/pkg/runtime/serializer/streaming.Decoder(*k8s.io/apimachinery/pkg/runtime/serializer/streaming.decoder) ..., embeddedDecoder: k8s.io/apimachinery/pkg/runtime.Decoder(k8s.io/apimachinery/pkg/runtime.WithoutVersionDecoder) *(*"k8s.io/apimachinery/pkg/runtime.Decoder")(0x40003f6010),}, reporter: k8s.io/apimachinery/pkg/watch.Reporter(*k8s.io/apimachinery/pkg/api/errors.ErrorReporter) *{ code: 500, verb: "GET", reason: "ClientWatchDecoding",}, result: chan k8s.io/apimachinery/pkg/watch.Event { qcount: 0, dataqsiz: 0, buf: *[0]k8s.io/apimachinery/pkg/watch.Event [], elemsize: 32, closed: 0, elemtype: *runtime._type {size: 32, ptrdata: 32, hash: 107209836, tflag: tflagUncommon|tflagExtraStar|tflagNamed (7), align: 8, fieldAlign: 8, kind: 25, equal: type..eq.k8s.io/apimachinery/pkg/watch.Event, gcdata: *9, str: 70259, ptrToThis: 1793344}, sendx: 0, recvx: 0, recvq: waitq<k8s.io/apimachinery/pkg/watch.Event> { first: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil, last: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil,}, sendq: waitq<k8s.io/apimachinery/pkg/watch.Event> { first: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil, last: *sudog<k8s.io/apimachinery/pkg/watch.Event> nil,}, lock: runtime.mutex {key: 0},}, done: chan struct {} { qcount: 0, dataqsiz: 0, buf: *[0]struct struct {} [], elemsize: 0, closed: 0, elemtype: *runtime._type {size: 0, ptrdata: 0, hash: 670477339, tflag: tflagExtraStar|tflagRegularMemory (10), align: 1, fieldAlign: 1, kind: 25, equal: runtime.memequal0, gcdata: *1, str: 47516, ptrToThis: 653632}, sendx: 0, recvx: 0, recvq: waitq<struct {}> { first: *sudog<struct {}> nil, last: *sudog<struct {}> nil,}, sendq: waitq<struct {}> { first: *sudog<struct {}> nil, last: *sudog<struct {}> nil,}, lock: runtime.mutex {key: 0},},} (dlv)
(dlv) p event Command failed: could not find symbol value for event (dlv) p *event Command failed: could not find symbol value for event (dlv) p event.Object Command failed: could not find symbol value for event (dlv) n > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:522 (PC: 0xe16444) Warning: debugging optimized function 517: case watch.Bookmark: 518: // A `Bookmark` means watch has synced here, just update the resourceVersion 519: default: 520: utilruntime.HandleError(fmt.Errorf("%s: unable to understand watch event %#v", r.name, event)) 521: } => 522: *resourceVersion = newResourceVersion 523: r.setLastSyncResourceVersion(newResourceVersion) 524: if rvu, ok := r.store.(ResourceVersionUpdater); ok { 525: rvu.UpdateResourceVersion(newResourceVersion) 526: } 527: eventCount++ (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(91):3 total:5) (PC: 0xe16408) Warning: debugging optimized function 495: continue 496: } 497: newResourceVersion := meta.GetResourceVersion() 498: switch event.Type { 499: case watch.Added: => 500: err := r.store.Add(event.Object) 501: if err != nil { 502: utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) 503: } 504: case watch.Modified: 505: err := r.store.Update(event.Object) (dlv) s > k8s.io/client-go/tools/cache.(*DeltaFIFO).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:277 (PC: 0xe0f8a8) Warning: debugging optimized function 272: return f.populated && f.initialPopulationCount == 0 273: } 274: 275: // Add inserts an item, and puts it in the queue. The item is only enqueued 276: // if it doesn't already exist in the set. => 277: func (f *DeltaFIFO) Add(obj interface{}) error { 278: f.lock.Lock() 279: defer f.lock.Unlock() 280: f.populated = true 281: return f.queueActionLocked(Added, obj) 282: } (dlv) n
(dlv) b reflector.go:509
触发delete
root@ubuntu:~/sample-controller/artifacts/examples# kubectl delete -f example-foo.yaml foo.samplecontroller.k8s.io "example-foo" deleted root@ubuntu:~/sample-controller/artifacts/examples#
I0709 18:45:53.458531 63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):1 total:1) (PC: 0xe16674) Warning: debugging optimized function 504: case watch.Modified: 505: err := r.store.Update(event.Object) 506: if err != nil { 507: utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 508: } => 509: case watch.Deleted: 510: // TODO: Will any consumers need access to the "last known 511: // state", which is passed in event.Object? If so, may need 512: // to change this. 513: err := r.store.Delete(event.Object) 514: if err != nil { (dlv) bt 0 0x0000000000e16674 in k8s.io/client-go/tools/cache.(*Reflector).watchHandler at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 1 0x0000000000e15824 in k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:428 2 0x0000000000e1dbac in k8s.io/client-go/tools/cache.(*Reflector).Run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:221 3 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 4 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 5 0x0000000000e14fc8 in k8s.io/client-go/tools/cache.(*Reflector).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:220 6 0x0000000000e20a48 in k8s.io/client-go/tools/cache.(*Reflector).Run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:218 7 0x00000000006b8ce8 in k8s.io/apimachinery/pkg/util/wait.(*Group).StartWithChannel.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:56 8 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 9 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):1 total:2) (PC: 0xe16674) Warning: debugging optimized function 504: case watch.Modified: 505: err := r.store.Update(event.Object) 506: if err != nil { 507: utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 508: } => 509: case watch.Deleted: 510: // TODO: Will any consumers need access to the "last known 511: // state", which is passed in event.Object? If so, may need 512: // to change this. 513: err := r.store.Delete(event.Object) 514: if err != nil { (dlv) p event.Object Command failed: could not find symbol value for event (dlv) c
图中 3->4->5的流程
(dlv) c E0709 18:53:48.900792 63308 controller.go:257] foo 'default/example-foo' in work queue no longer exists I0709 18:53:48.900868 63308 controller.go:228] Successfully synced 'default/example-foo' > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(102):3 total:6) (PC: 0xe16408) Warning: debugging optimized function 495: continue 496: } 497: newResourceVersion := meta.GetResourceVersion() 498: switch event.Type { 499: case watch.Added: => 500: err := r.store.Add(event.Object) 501: if err != nil { 502: utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) 503: } 504: case watch.Modified: 505: err := r.store.Update(event.Object) (dlv) b thread_safe_store.go:68 Command failed: could not find statement at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:68, please use a line with a statement (dlv) b thread_safe_store.Add Command failed: location "thread_safe_store.Add" not found (dlv) b ThreadSafeStore.Add Command failed: location "ThreadSafeStore.Add" not found (dlv) b thread_safe_store.go:73 Breakpoint 5 (enabled) set at 0xe1b6b8 for k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (dlv) c > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(49):1 total:1) (PC: 0xe1b6b8) Warning: debugging optimized function 68: indexers Indexers 69: // indices maps a name to an Index 70: indices Indices 71: } 72: => 73: func (c *threadSafeMap) Add(key string, obj interface{}) { 74: c.lock.Lock() 75: defer c.lock.Unlock() 76: oldObject := c.items[key] 77: c.items[key] = obj 78: c.updateIndices(oldObject, obj, key) (dlv) bt 0 0x0000000000e1b6b8 in k8s.io/client-go/tools/cache.(*threadSafeMap).Add at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 1 0x0000000000e1ad30 in k8s.io/client-go/tools/cache.(*cache).Add at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/store.go:155 2 0x0000000000e191bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:557 3 0x0000000000e20b80 in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:527 4 0x0000000000e1150c in k8s.io/client-go/tools/cache.(*DeltaFIFO).Pop at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:539 5 0x0000000000e0f354 in k8s.io/client-go/tools/cache.(*controller).processLoop at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:183 6 0x0000000000e20a90 in k8s.io/client-go/tools/cache.(*controller).processLoop-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:181 7 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 8 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 9 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 10 0x0000000000e0f0e8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 11 0x0000000000e0f0e8 in k8s.io/client-go/tools/cache.(*controller).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:154 12 0x0000000000e180bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:410 13 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv)
(dlv) s > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:74 (PC: 0xe1b6d0) Warning: debugging optimized function 69: // indices maps a name to an Index 70: indices Indices 71: } 72: 73: func (c *threadSafeMap) Add(key string, obj interface{}) { => 74: c.lock.Lock() 75: defer c.lock.Unlock() 76: oldObject := c.items[key] 77: c.items[key] = obj 78: c.updateIndices(oldObject, obj, key) 79: } (dlv) p key "default/example-foo" (dlv) c.items Command failed: command not available (dlv) p c.items map[string]interface {} [] (dlv) p c *k8s.io/client-go/tools/cache.threadSafeMap { lock: sync.RWMutex { w: (*sync.Mutex)(0x40001fd350), writerSem: 0, readerSem: 0, readerCount: 0, readerWait: 0,}, items: map[string]interface {} [], indexers: k8s.io/client-go/tools/cache.Indexers [ "namespace": k8s.io/client-go/tools/cache.MetaNamespaceIndexFunc, ], indices: k8s.io/client-go/tools/cache.Indices [ "namespace": [], ],} (dlv) n W0709 18:58:10.290090 63308 reflector.go:441] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: watch of *v1.Deployment ended with: an error on the server ("unable to decode an event from the watch stream: http2: client connection lost") has prevented the request from succeeding W0709 18:58:10.290092 63308 reflector.go:441] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: watch of *v1alpha1.Foo ended with: an error on the server ("unable to decode an event from the watch stream: http2: client connection lost") has prevented the request from succeeding > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:75 (PC: 0xe1b6e0) Warning: debugging optimized function 70: indices Indices 71: } 72: 73: func (c *threadSafeMap) Add(key string, obj interface{}) { 74: c.lock.Lock() => 75: defer c.lock.Unlock() 76: oldObject := c.items[key] 77: c.items[key] = obj 78: c.updateIndices(oldObject, obj, key) 79: } 80: (dlv) n > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:76 (PC: 0xe1b6fc) Warning: debugging optimized function 71: } 72: 73: func (c *threadSafeMap) Add(key string, obj interface{}) { 74: c.lock.Lock() 75: defer c.lock.Unlock() => 76: oldObject := c.items[key] 77: c.items[key] = obj 78: c.updateIndices(oldObject, obj, key) 79: } 80: 81: func (c *threadSafeMap) Update(key string, obj interface{}) { (dlv) n > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:77 (PC: 0xe1b738) Warning: debugging optimized function 72: 73: func (c *threadSafeMap) Add(key string, obj interface{}) { 74: c.lock.Lock() 75: defer c.lock.Unlock() 76: oldObject := c.items[key] => 77: c.items[key] = obj 78: c.updateIndices(oldObject, obj, key) 79: } 80: 81: func (c *threadSafeMap) Update(key string, obj interface{}) { 82: c.lock.Lock() (dlv) p obj interface {}(*k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo) *{ TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta {Kind: "", APIVersion: ""}, ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta { Name: "example-foo", GenerateName: "", Namespace: "default", SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/e...+10 more", UID: "1917188d-93f5-4f67-8a52-d07366704dde", ResourceVersion: "1889422", Generation: 1, CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x4000d22308), DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil, DeletionGracePeriodSeconds: *int64 nil, Labels: map[string]string nil, Annotations: map[string]string [...], OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil, Finalizers: []string len: 0, cap: 0, nil, ClusterName: "", ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 1, cap: 1, [ (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x4000190240), ],}, Spec: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec { DeploymentName: "example-foo", Replicas: *1,}, Status: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus {AvailableReplicas: 0},} (dlv)
(dlv) bt 0 0x0000000000e1b738 in k8s.io/client-go/tools/cache.(*threadSafeMap).Add at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:77 1 0x0000000000e1ad30 in k8s.io/client-go/tools/cache.(*cache).Add at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/store.go:155 2 0x0000000000e191bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:557 3 0x0000000000e20b80 in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:527 4 0x0000000000e1150c in k8s.io/client-go/tools/cache.(*DeltaFIFO).Pop at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:539 5 0x0000000000e0f354 in k8s.io/client-go/tools/cache.(*controller).processLoop at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:183 6 0x0000000000e20a90 in k8s.io/client-go/tools/cache.(*controller).processLoop-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:181 7 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 8 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 9 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 10 0x0000000000e0f0e8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 11 0x0000000000e0f0e8 in k8s.io/client-go/tools/cache.(*controller).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:154 12 0x0000000000e180bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:410 13 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv)
删除
> k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):3 total:5) (PC: 0xe16674) Warning: debugging optimized function 504: case watch.Modified: 505: err := r.store.Update(event.Object) 506: if err != nil { 507: utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 508: } => 509: case watch.Deleted: 510: // TODO: Will any consumers need access to the "last known 511: // state", which is passed in event.Object? If so, may need 512: // to change this. 513: err := r.store.Delete(event.Object) 514: if err != nil { (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):3 total:6) (PC: 0xe16674) Warning: debugging optimized function > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):8 total:10) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) c E0709 19:07:41.884046 63308 controller.go:257] foo 'default/example-foo' in work queue no longer exists I0709 19:07:41.884085 63308 controller.go:228] Successfully synced 'default/example-foo'
图中 6->7的流程
(dlv) b enqueueFoo Breakpoint 6 (enabled) set at 0xee9948 for main.(*Controller).enqueueFoo() ./controller.go:338 (dlv) c > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):1 total:1) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) bt 0 0x0000000000ee9948 in main.(*Controller).enqueueFoo at ./controller.go:338 1 0x0000000000eeb1d0 in main.(*Controller).enqueueFoo-fm at ./controller.go:338 2 0x0000000000e20dc0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnAdd at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:231 3 0x0000000000e1f2a0 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:777 4 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 5 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 6 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 7 0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 8 0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771 9 0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765 10 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 11 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) b AddAfter Command failed: Location "AddAfter" ambiguous: k8s.io/client-go/util/workqueue.(*delayingType).AddAfter, k8s.io/client-go/util/workqueue.(*rateLimitingType).AddAfter, k8s.io/client-go/util/workqueue.rateLimitingType.AddAfter… (dlv) b k8s.io/client-go/util/workqueue.(*delayingType).AddAfter Breakpoint 7 (enabled) set at 0xc24328 for k8s.io/client-go/util/workqueue.(*delayingType).AddAfter() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/delaying_queue.go:160 (dlv) c I0709 19:03:25.953479 63308 trace.go:205] Trace[1225511528]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167 (09-Jul-2021 18:58:12.969) (total time: 243729ms): Trace[1225511528]: [4m3.72932702s] [4m3.72932702s] END E0709 19:03:25.953605 63308 reflector.go:138] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: Failed to watch *v1alpha1.Foo: failed to list *v1alpha1.Foo: Get "https://10.10.16.249:6443/apis/samplecontroller.k8s.io/v1alpha1/foos?resourceVersion=1889422": net/http: TLS handshake timeout > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):2 total:2) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) btt Command failed: command not available (dlv) bt 0 0x0000000000ee9948 in main.(*Controller).enqueueFoo at ./controller.go:338 1 0x0000000000eeac70 in main.NewController.func1 at ./controller.go:120 2 0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238 3 0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775 4 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 5 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 6 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 7 0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 8 0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771 9 0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765 10 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 11 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) c I0709 19:03:25.953631 63308 trace.go:205] Trace[1852186258]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167 (09-Jul-2021 18:58:12.970) (total time: 312982ms): Trace[1852186258]: [5m12.982727975s] [5m12.982727975s] END E0709 19:03:38.269089 63308 reflector.go:138] pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167: Failed to watch *v1.Deployment: failed to list *v1.Deployment: Get "https://10.10.16.249:6443/apis/apps/v1/deployments?resourceVersion=1889388": net/http: TLS handshake timeout > k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 (hits goroutine(102):4 total:5) (PC: 0xe1e51c) Warning: debugging optimized function 343: if err != nil { 344: return fmt.Errorf("unable to understand list result %#v: %v", list, err) 345: } 346: resourceVersion = listMetaInterface.GetResourceVersion() 347: initTrace.Step("Resource version extracted") => 348: items, err := meta.ExtractList(list) 349: if err != nil { 350: return fmt.Errorf("unable to understand list result %#v (%v)", list, err) 351: } 352: initTrace.Step("Objects extracted") 353: if err := r.syncWith(items, resourceVersion); err != nil { (dlv) c > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):3 total:3) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) bt 0 0x0000000000ee9948 in main.(*Controller).enqueueFoo at ./controller.go:338 1 0x0000000000eeac70 in main.NewController.func1 at ./controller.go:120 2 0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238 3 0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775 4 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 5 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 6 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 7 0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 8 0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771 9 0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765 10 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 11 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(102):6 total:11) (PC: 0xe16048) Warning: debugging optimized function 454: } 455: return r.store.Replace(found, resourceVersion) 456: } 457: 458: // watchHandler watches w and keeps *resourceVersion up to date. => 459: func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { 460: eventCount := 0 461: 462: // Stopping the watcher should be idempotent and if we return from this function there's no way 463: // we're coming back in with the same watch interface. 464: defer w.Stop() (dlv) c I0709 19:03:59.868255 63308 controller.go:228] Successfully synced 'default/example-foo' > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):4 total:4) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) bt 0 0x0000000000ee9948 in main.(*Controller).enqueueFoo at ./controller.go:338 1 0x0000000000eeac70 in main.NewController.func1 at ./controller.go:120 2 0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238 3 0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775 4 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 5 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 6 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 7 0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 8 0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771 9 0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765 10 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 11 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) c I0709 19:04:13.265775 63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" > k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch.func1() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:348 (hits goroutine(91):2 total:6) (PC: 0xe1e51c) Warning: debugging optimized function 343: if err != nil { 344: return fmt.Errorf("unable to understand list result %#v: %v", list, err) 345: } 346: resourceVersion = listMetaInterface.GetResourceVersion() 347: initTrace.Step("Resource version extracted") => 348: items, err := meta.ExtractList(list) 349: if err != nil { 350: return fmt.Errorf("unable to understand list result %#v (%v)", list, err) 351: } 352: initTrace.Step("Objects extracted") 353: if err := r.syncWith(items, resourceVersion); err != nil { (dlv) c I0709 19:04:21.857750 63308 trace.go:205] Trace[443632888]: "Reflector ListAndWatch" name:pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:167 (09-Jul-2021 19:03:47.178) (total time: 34678ms): Trace[443632888]: ---"Objects listed" 26089ms (19:04:00.268) Trace[443632888]: ---"Objects extracted" 8589ms (19:04:00.857) Trace[443632888]: [34.678769477s] [34.678769477s] END > k8s.io/client-go/util/workqueue.(*delayingType).AddAfter() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/delaying_queue.go:160 (hits goroutine(123):1 total:1) (PC: 0xc24328) Warning: debugging optimized function > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(91):6 total:12) (PC: 0xe16048) Warning: debugging optimized function > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(48):1 total:2) (PC: 0xe1b6b8) Warning: debugging optimized function > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):5 total:5) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) bt 0 0x0000000000ee9948 in main.(*Controller).enqueueFoo at ./controller.go:338 1 0x0000000000eeac70 in main.NewController.func1 at ./controller.go:120 2 0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238 3 0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775 4 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 5 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 6 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 7 0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 8 0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771 9 0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765 10 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 11 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) c > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(130):1 total:6) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) c E0709 19:04:30.134547 63308 controller.go:233] error syncing 'default/example-foo': deployments.apps "example-foo" already exists, requeuing > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(130):2 total:7) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) bt 0 0x0000000000ee9948 in main.(*Controller).enqueueFoo at ./controller.go:338 1 0x0000000000ee9d60 in main.(*Controller).handleObject at ./controller.go:383 2 0x0000000000eeacf8 in main.NewController.func2 at ./controller.go:139 3 0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238 4 0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775 5 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 6 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 7 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 8 0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 9 0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771 10 0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765 11 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 12 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) c > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):6 total:8) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) bt 0 0x0000000000ee9948 in main.(*Controller).enqueueFoo at ./controller.go:338 1 0x0000000000eeac70 in main.NewController.func1 at ./controller.go:120 2 0x0000000000e20ef0 in k8s.io/client-go/tools/cache.(*ResourceEventHandlerFuncs).OnUpdate at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:238 3 0x0000000000e1f380 in k8s.io/client-go/tools/cache.(*processorListener).run.func1 at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:775 4 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 5 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 6 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 7 0x0000000000e1a4c8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 8 0x0000000000e1a4c8 in k8s.io/client-go/tools/cache.(*processorListener).run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:771 9 0x0000000000e20c70 in k8s.io/client-go/tools/cache.(*processorListener).run-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:765 10 0x00000000006b8dac in k8s.io/apimachinery/pkg/util/wait.(*Group).Start.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:73 11 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv)
(dlv) c I0709 19:11:07.684833 63308 controller.go:228] Successfully synced 'default/example-foo' I0709 19:11:07.685614 63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" I0709 19:11:07.690807 63308 controller.go:228] Successfully synced 'default/example-foo' I0709 19:11:07.690870 63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):4 total:7) (PC: 0xe16674) Warning: debugging optimized function 504: case watch.Modified: 505: err := r.store.Update(event.Object) 506: if err != nil { 507: utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 508: } => 509: case watch.Deleted: 510: // TODO: Will any consumers need access to the "last known 511: // state", which is passed in event.Object? If so, may need 512: // to change this. 513: err := r.store.Delete(event.Object) 514: if err != nil { (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):4 total:8) (PC: 0xe16674) Warning: debugging optimized function 504: case watch.Modified: 505: err := r.store.Update(event.Object) 506: if err != nil { 507: utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 508: } => 509: case watch.Deleted: 510: // TODO: Will any consumers need access to the "last known 511: // state", which is passed in event.Object? If so, may need 512: // to change this. 513: err := r.store.Delete(event.Object) 514: if err != nil { (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(102):5 total:9) (PC: 0xe16408) Warning: debugging optimized function 495: continue 496: } 497: newResourceVersion := meta.GetResourceVersion() 498: switch event.Type { 499: case watch.Added: => 500: err := r.store.Add(event.Object) 501: if err != nil { 502: utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) 503: } 504: case watch.Modified: 505: err := r.store.Update(event.Object) (dlv) c > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(49):3 total:5) (PC: 0xe1b6b8) Warning: debugging optimized function 68: indexers Indexers 69: // indices maps a name to an Index 70: indices Indices 71: } 72: => 73: func (c *threadSafeMap) Add(key string, obj interface{}) { 74: c.lock.Lock() 75: defer c.lock.Unlock() 76: oldObject := c.items[key] 77: c.items[key] = obj 78: c.updateIndices(oldObject, obj, key) (dlv) c > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):15 total:22) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) s > main.(*Controller).enqueueFoo() ./controller.go:341 (PC: 0xee9954) Warning: debugging optimized function 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error => 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return 344: } 345: c.workqueue.Add(key) 346: } (dlv) n > main.(*Controller).enqueueFoo() ./controller.go:345 (PC: 0xee9994) Warning: debugging optimized function 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return 344: } => 345: c.workqueue.Add(key) 346: } 347: 348: // handleObject will take any resource implementing metav1.Object and attempt 349: // to find the Foo resource that 'owns' it. It does this by looking at the 350: // objects metadata.ownerReferences field for an appropriate OwnerReference. (dlv) s > k8s.io/client-go/util/workqueue.(*rateLimitingType).Add() <autogenerated>:1 (PC: 0xc26e88) Warning: debugging optimized function (dlv) n > main.(*Controller).enqueueFoo() ./controller.go:346 (PC: 0xee99c4) Warning: debugging optimized function Values returned: 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return 344: } 345: c.workqueue.Add(key) => 346: } 347: 348: // handleObject will take any resource implementing metav1.Object and attempt 349: // to find the Foo resource that 'owns' it. It does this by looking at the 350: // objects metadata.ownerReferences field for an appropriate OwnerReference. 351: // It then enqueues that Foo resource to be processed. If the object does not (dlv) p c.workqueue k8s.io/client-go/util/workqueue.RateLimitingInterface(*k8s.io/client-go/util/workqueue.rateLimitingType) *{ DelayingInterface: k8s.io/client-go/util/workqueue.DelayingInterface(*k8s.io/client-go/util/workqueue.delayingType) *{ Interface: k8s.io/client-go/util/workqueue.Interface(*k8s.io/client-go/util/workqueue.Type) ..., clock: k8s.io/apimachinery/pkg/util/clock.Clock(k8s.io/apimachinery/pkg/util/clock.RealClock) *(*"k8s.io/apimachinery/pkg/util/clock.Clock")(0x40003c3390), stopCh: chan struct {} { qcount: 0, dataqsiz: 0, buf: *[0]struct struct {} [], elemsize: 0, closed: 0, elemtype: *runtime._type {size: 0, ptrdata: 0, hash: 670477339, tflag: tflagExtraStar|tflagRegularMemory (10), align: 1, fieldAlign: 1, kind: 25, equal: runtime.memequal0, gcdata: *1, str: 47516, ptrToThis: 653632}, sendx: 0, recvx: 0, recvq: waitq<struct {}> { first: *(*sudog<struct {}>)(0x4000423320), last: *(*sudog<struct {}>)(0x4000423320),}, sendq: waitq<struct {}> { first: *sudog<struct {}> nil, last: *sudog<struct {}> nil,}, lock: runtime.mutex {key: 0},}, stopOnce: (*sync.Once)(0x40003c33a8), heartbeat: k8s.io/apimachinery/pkg/util/clock.Ticker(*k8s.io/apimachinery/pkg/util/clock.realTicker) ..., waitingForAddCh: chan *k8s.io/client-go/util/workqueue.waitFor { qcount: 0, dataqsiz: 1000, buf: *[1000]*k8s.io/client-go/util/workqueue.waitFor [ *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, ...+936 more ], elemsize: 8, closed: 0, elemtype: *runtime._type {size: 8, ptrdata: 8, hash: 3317760887, tflag: tflagRegularMemory (8), align: 8, fieldAlign: 8, kind: 54, equal: runtime.memequal64, gcdata: *1, str: 144019, ptrToThis: 0}, sendx: 0, recvx: 0, recvq: waitq<*k8s.io/client-go/util/workqueue.waitFor> { first: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200), last: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200),}, sendq: waitq<*k8s.io/client-go/util/workqueue.waitFor> { first: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil, last: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil,}, lock: runtime.mutex {key: 0},}, metrics: k8s.io/client-go/util/workqueue.retryMetrics(*k8s.io/client-go/util/workqueue.defaultRetryMetrics) ...,}, rateLimiter: k8s.io/client-go/util/workqueue.RateLimiter(*k8s.io/client-go/util/workqueue.MaxOfRateLimiter) *{ limiters: []k8s.io/client-go/util/workqueue.RateLimiter len: 2, cap: 2, [ ..., ..., ],},} (dlv)
workqueue
(dlv) c I0709 19:11:07.684833 63308 controller.go:228] Successfully synced 'default/example-foo' I0709 19:11:07.685614 63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" I0709 19:11:07.690807 63308 controller.go:228] Successfully synced 'default/example-foo' I0709 19:11:07.690870 63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(102):4 total:7) (PC: 0xe16674) Warning: debugging optimized function 504: case watch.Modified: 505: err := r.store.Update(event.Object) 506: if err != nil { 507: utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 508: } => 509: case watch.Deleted: 510: // TODO: Will any consumers need access to the "last known 511: // state", which is passed in event.Object? If so, may need 512: // to change this. 513: err := r.store.Delete(event.Object) 514: if err != nil { (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:509 (hits goroutine(91):4 total:8) (PC: 0xe16674) Warning: debugging optimized function 504: case watch.Modified: 505: err := r.store.Update(event.Object) 506: if err != nil { 507: utilruntime.HandleError(fmt.Errorf("%s: unable to update watch event object (%#v) to store: %v", r.name, event.Object, err)) 508: } => 509: case watch.Deleted: 510: // TODO: Will any consumers need access to the "last known 511: // state", which is passed in event.Object? If so, may need 512: // to change this. 513: err := r.store.Delete(event.Object) 514: if err != nil { (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(102):5 total:9) (PC: 0xe16408) Warning: debugging optimized function 495: continue 496: } 497: newResourceVersion := meta.GetResourceVersion() 498: switch event.Type { 499: case watch.Added: => 500: err := r.store.Add(event.Object) 501: if err != nil { 502: utilruntime.HandleError(fmt.Errorf("%s: unable to add watch event object (%#v) to store: %v", r.name, event.Object, err)) 503: } 504: case watch.Modified: 505: err := r.store.Update(event.Object) (dlv) c > k8s.io/client-go/tools/cache.(*threadSafeMap).Add() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:73 (hits goroutine(49):3 total:5) (PC: 0xe1b6b8) Warning: debugging optimized function 68: indexers Indexers 69: // indices maps a name to an Index 70: indices Indices 71: } 72: => 73: func (c *threadSafeMap) Add(key string, obj interface{}) { 74: c.lock.Lock() 75: defer c.lock.Unlock() 76: oldObject := c.items[key] 77: c.items[key] = obj 78: c.updateIndices(oldObject, obj, key) (dlv) c > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):15 total:22) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) s > main.(*Controller).enqueueFoo() ./controller.go:341 (PC: 0xee9954) Warning: debugging optimized function 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error => 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return 344: } 345: c.workqueue.Add(key) 346: } (dlv) n > main.(*Controller).enqueueFoo() ./controller.go:345 (PC: 0xee9994) Warning: debugging optimized function 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return 344: } => 345: c.workqueue.Add(key) 346: } 347: 348: // handleObject will take any resource implementing metav1.Object and attempt 349: // to find the Foo resource that 'owns' it. It does this by looking at the 350: // objects metadata.ownerReferences field for an appropriate OwnerReference. (dlv) s > k8s.io/client-go/util/workqueue.(*rateLimitingType).Add() <autogenerated>:1 (PC: 0xc26e88) Warning: debugging optimized function (dlv) n > main.(*Controller).enqueueFoo() ./controller.go:346 (PC: 0xee99c4) Warning: debugging optimized function Values returned: 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return 344: } 345: c.workqueue.Add(key) => 346: } 347: 348: // handleObject will take any resource implementing metav1.Object and attempt 349: // to find the Foo resource that 'owns' it. It does this by looking at the 350: // objects metadata.ownerReferences field for an appropriate OwnerReference. 351: // It then enqueues that Foo resource to be processed. If the object does not (dlv) p c.workqueue k8s.io/client-go/util/workqueue.RateLimitingInterface(*k8s.io/client-go/util/workqueue.rateLimitingType) *{ DelayingInterface: k8s.io/client-go/util/workqueue.DelayingInterface(*k8s.io/client-go/util/workqueue.delayingType) *{ Interface: k8s.io/client-go/util/workqueue.Interface(*k8s.io/client-go/util/workqueue.Type) ..., clock: k8s.io/apimachinery/pkg/util/clock.Clock(k8s.io/apimachinery/pkg/util/clock.RealClock) *(*"k8s.io/apimachinery/pkg/util/clock.Clock")(0x40003c3390), stopCh: chan struct {} { qcount: 0, dataqsiz: 0, buf: *[0]struct struct {} [], elemsize: 0, closed: 0, elemtype: *runtime._type {size: 0, ptrdata: 0, hash: 670477339, tflag: tflagExtraStar|tflagRegularMemory (10), align: 1, fieldAlign: 1, kind: 25, equal: runtime.memequal0, gcdata: *1, str: 47516, ptrToThis: 653632}, sendx: 0, recvx: 0, recvq: waitq<struct {}> { first: *(*sudog<struct {}>)(0x4000423320), last: *(*sudog<struct {}>)(0x4000423320),}, sendq: waitq<struct {}> { first: *sudog<struct {}> nil, last: *sudog<struct {}> nil,}, lock: runtime.mutex {key: 0},}, stopOnce: (*sync.Once)(0x40003c33a8), heartbeat: k8s.io/apimachinery/pkg/util/clock.Ticker(*k8s.io/apimachinery/pkg/util/clock.realTicker) ..., waitingForAddCh: chan *k8s.io/client-go/util/workqueue.waitFor { qcount: 0, dataqsiz: 1000, buf: *[1000]*k8s.io/client-go/util/workqueue.waitFor [ *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, *nil, ...+936 more ], elemsize: 8, closed: 0, elemtype: *runtime._type {size: 8, ptrdata: 8, hash: 3317760887, tflag: tflagRegularMemory (8), align: 8, fieldAlign: 8, kind: 54, equal: runtime.memequal64, gcdata: *1, str: 144019, ptrToThis: 0}, sendx: 0, recvx: 0, recvq: waitq<*k8s.io/client-go/util/workqueue.waitFor> { first: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200), last: *(*"sudog<*k8s.io/client-go/util/workqueue.waitFor>")(0x4000423200),}, sendq: waitq<*k8s.io/client-go/util/workqueue.waitFor> { first: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil, last: *sudog<*k8s.io/client-go/util/workqueue.waitFor> nil,}, lock: runtime.mutex {key: 0},}, metrics: k8s.io/client-go/util/workqueue.retryMetrics(*k8s.io/client-go/util/workqueue.defaultRetryMetrics) ...,}, rateLimiter: k8s.io/client-go/util/workqueue.RateLimiter(*k8s.io/client-go/util/workqueue.MaxOfRateLimiter) *{ limiters: []k8s.io/client-go/util/workqueue.RateLimiter len: 2, cap: 2, [ ..., ..., ],},} (dlv)
图中8->9
workqueue
(dlv) b queue.go:152 Command failed: could not find statement at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:152, please use a line with a statement (dlv) b queue.go:147 Breakpoint 9 (enabled) set at 0xc25e48 for k8s.io/client-go/util/workqueue.(*Type).Get() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:147 (dlv) c > main.(*Controller).enqueueFoo() ./controller.go:338 (hits goroutine(74):16 total:23) (PC: 0xee9948) Warning: debugging optimized function 333: } 334: 335: // enqueueFoo takes a Foo resource and converts it into a namespace/name 336: // string which is then put onto the work queue. This method should *not* be 337: // passed resources of any type other than Foo. => 338: func (c *Controller) enqueueFoo(obj interface{}) { 339: var key string 340: var err error 341: if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil { 342: utilruntime.HandleError(err) 343: return (dlv) c > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(102):7 total:14) (PC: 0xe16048) Warning: debugging optimized function > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:459 (hits goroutine(91):7 total:14) (PC: 0xe16048) Warning: debugging optimized function 454: } 455: return r.store.Replace(found, resourceVersion) 456: } 457: 458: // watchHandler watches w and keeps *resourceVersion up to date. => 459: func (r *Reflector) watchHandler(start time.Time, w watch.Interface, resourceVersion *string, errc chan error, stopCh <-chan struct{}) error { 460: eventCount := 0 461: 462: // Stopping the watcher should be idempotent and if we return from this function there's no way 463: // we're coming back in with the same watch interface. 464: defer w.Stop() (dlv) c I0709 19:14:30.567295 63308 controller.go:228] Successfully synced 'default/example-foo' I0709 19:14:30.567503 63308 event.go:291] "Event occurred" object="default/example-foo" kind="Foo" apiVersion="samplecontroller.k8s.io/v1alpha1" type="Normal" reason="Synced" message="Foo synced successfully" > k8s.io/client-go/tools/cache.(*Reflector).watchHandler() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/reflector.go:500 (hits goroutine(91):5 total:10) (PC: 0xe16408) Warning: debugging optimized function > k8s.io/client-go/util/workqueue.(*Type).Get() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:147 (hits goroutine(124):1 total:1) (PC: 0xc25e48) Warning: debugging optimized function 142: } 143: 144: // Get blocks until it can return an item to be processed. If shutdown = true, 145: // the caller should end their goroutine. You must call Done with item when you 146: // have finished processing it. => 147: func (q *Type) Get() (item interface{}, shutdown bool) { 148: q.cond.L.Lock() 149: defer q.cond.L.Unlock() 150: for len(q.queue) == 0 && !q.shuttingDown { 151: q.cond.Wait() 152: } (dlv) bt 0 0x0000000000c25e48 in k8s.io/client-go/util/workqueue.(*Type).Get at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/util/workqueue/queue.go:147 1 0x0000000000c26a54 in k8s.io/client-go/util/workqueue.(*delayingType).Get at <autogenerated>:1 2 0x0000000000c27044 in k8s.io/client-go/util/workqueue.(*rateLimitingType).Get at <autogenerated>:1 3 0x0000000000ee8e5c in main.(*Controller).processNextWorkItem at ./controller.go:188 4 0x0000000000ee8e00 in main.(*Controller).runWorker at ./controller.go:181 5 0x0000000000eeb260 in main.(*Controller).runWorker-fm at ./controller.go:180 6 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 7 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 8 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 9 0x00000000006b7d80 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 10 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv)
updateIndices
root@ubuntu:~/sample-controller# dlv exec ./sample-controller -- --kubeconfig=$HOME/.kube/config Type 'help' for list of commands. (dlv) b updateIndices Breakpoint 1 (enabled) set at 0xe1d248 for k8s.io/client-go/tools/cache.(*threadSafeMap).updateIndices() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:256 (dlv) c I0709 19:30:20.770701 32518 controller.go:115] Setting up event handlers I0709 19:30:20.770868 32518 controller.go:156] Starting Foo controller I0709 19:30:20.770882 32518 controller.go:159] Waiting for informer caches to sync > k8s.io/client-go/tools/cache.(*threadSafeMap).updateIndices() /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:256 (hits goroutine(105):1 total:1) (PC: 0xe1d248) Warning: debugging optimized function 251: return nil 252: } 253: 254: // updateIndices modifies the objects location in the managed indexes, if this is an update, you must provide an oldObj 255: // updateIndices must be called from a function that already has a lock on the cache => 256: func (c *threadSafeMap) updateIndices(oldObj interface{}, newObj interface{}, key string) { 257: // if we got an old object, we need to remove it before we add it again 258: if oldObj != nil { 259: c.deleteFromIndices(oldObj, key) 260: } 261: for name, indexFunc := range c.indexers { (dlv) bt 0 0x0000000000e1d248 in k8s.io/client-go/tools/cache.(*threadSafeMap).updateIndices at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:256 1 0x0000000000e1b7bc in k8s.io/client-go/tools/cache.(*threadSafeMap).Add at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/thread_safe_store.go:78 2 0x0000000000e1ad30 in k8s.io/client-go/tools/cache.(*cache).Add at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/store.go:155 3 0x0000000000e191bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:557 4 0x0000000000e20b80 in k8s.io/client-go/tools/cache.(*sharedIndexInformer).HandleDeltas-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:527 5 0x0000000000e1150c in k8s.io/client-go/tools/cache.(*DeltaFIFO).Pop at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/delta_fifo.go:539 6 0x0000000000e0f354 in k8s.io/client-go/tools/cache.(*controller).processLoop at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:183 7 0x0000000000e20a90 in k8s.io/client-go/tools/cache.(*controller).processLoop-fm at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:181 8 0x00000000006b8e4c in k8s.io/apimachinery/pkg/util/wait.BackoffUntil.func1 at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:155 9 0x00000000006b7eac in k8s.io/apimachinery/pkg/util/wait.BackoffUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:156 10 0x00000000006b7e20 in k8s.io/apimachinery/pkg/util/wait.JitterUntil at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:133 11 0x0000000000e0f0e8 in k8s.io/apimachinery/pkg/util/wait.Until at /opt/gopath/pkg/mod/k8s.io/apimachinery@v0.0.0-20210701054147-830375057167/pkg/util/wait/wait.go:90 12 0x0000000000e0f0e8 in k8s.io/client-go/tools/cache.(*controller).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/controller.go:154 13 0x0000000000e180bc in k8s.io/client-go/tools/cache.(*sharedIndexInformer).Run at /opt/gopath/pkg/mod/k8s.io/client-go@v0.0.0-20210701054555-843bb800b12a/tools/cache/shared_informer.go:410 14 0x00000000000718dc in runtime.goexit at /usr/local/go/src/runtime/asm_arm64.s:1148 (dlv) p oldObj interface {} nil (dlv) p newObj interface {}(*k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo) *{ TypeMeta: k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta { Kind: "Foo", APIVersion: "samplecontroller.k8s.io/v1alpha1",}, ObjectMeta: k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta { Name: "example-foo", GenerateName: "", Namespace: "default", SelfLink: "/apis/samplecontroller.k8s.io/v1alpha1/namespaces/default/foos/e...+10 more", UID: "b95d5147-f93a-4e66-af15-53f095179b56", ResourceVersion: "1892878", Generation: 2, CreationTimestamp: (*"k8s.io/apimachinery/pkg/apis/meta/v1.Time")(0x40004b66c8), DeletionTimestamp: *k8s.io/apimachinery/pkg/apis/meta/v1.Time nil, DeletionGracePeriodSeconds: *int64 nil, Labels: map[string]string nil, Annotations: map[string]string [...], OwnerReferences: []k8s.io/apimachinery/pkg/apis/meta/v1.OwnerReference len: 0, cap: 0, nil, Finalizers: []string len: 0, cap: 0, nil, ClusterName: "", ManagedFields: []k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry len: 2, cap: 2, [ (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40003ae3c0), (*"k8s.io/apimachinery/pkg/apis/meta/v1.ManagedFieldsEntry")(0x40003ae420), ],}, Spec: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec { DeploymentName: "example-foo", Replicas: *1,}, Status: k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus {AvailableReplicas: 0},} (dlv)
Indexer使用的是threadsafe_store.go中的threadSafeMap存储数据,是一个线程安全并且带有索引功能的map,数据只会存放在内存中,每次涉及操作都会进行加锁。
(dlv) p c.indexers k8s.io/client-go/tools/cache.Indexers [ "namespace": k8s.io/client-go/tools/cache.MetaNamespaceIndexFunc, ] (dlv) p c.indices k8s.io/client-go/tools/cache.Indices [] (dlv) p c.items map[string]interface {} [ "default/example-foo": *k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.Foo { TypeMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.TypeMeta")(0x40004b6640), ObjectMeta: (*"k8s.io/apimachinery/pkg/apis/meta/v1.ObjectMeta")(0x40004b6660), Spec: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooSpec")(0x40004b6758), Status: (*"k8s.io/sample-controller/pkg/apis/samplecontroller/v1alpha1.FooStatus")(0x40004b6770),}, ] (dlv) p c *k8s.io/client-go/tools/cache.threadSafeMap { lock: sync.RWMutex { w: (*sync.Mutex)(0x400059d2c0), writerSem: 0, readerSem: 0, readerCount: -1073741824, readerWait: 0,}, items: map[string]interface {} [ "default/example-foo": ..., ], indexers: k8s.io/client-go/tools/cache.Indexers [ "namespace": k8s.io/client-go/tools/cache.MetaNamespaceIndexFunc, ], indices: k8s.io/client-go/tools/cache.Indices [],} (dlv)
dlv exec ./sample-controller -- --kubeconfig=$HOME/.kube/config