Unverified Commit 73d34db6 authored by jlamillan's avatar jlamillan
Browse files

Use k8s informer cache instead of active API server calls in ingress and service sources.

parent 3301c968
master Raffo-patch-1 add-infoblox-maintainers changelog-for-v0.7.3 correctly-update-aws-records-when-type-changes dansimone/support-prefer-ingress-annotations dependabot/go_modules/github.com/Azure/azure-sdk-for-go-61.4.0incompatible dependabot/go_modules/github.com/aliyun/alibaba-cloud-sdk-go-1.61.1473 dependabot/go_modules/github.com/exoscale/egoscale-1.19.0 dependabot/go_modules/github.com/projectcontour/contour-1.20.0 dependabot/go_modules/k8s.io/apimachinery-0.23.3 fix-1820 gh-pages go-1.12.7 incubator-kep infoblox-multiple-A-records-fix labeler linki-patch-3 linki-patch-4 linki-patch-6 njuettner-patch-1 njuettner/go_modules/github.com/akamai/AkamaiOPEN-edgegrid-golang-0.9.11 njuettner/go_modules/github.com/alecthomas/kingpin-2.2.6incompatible njuettner/go_modules/github.com/digitalocean/godo-1.34.0 njuettner/go_modules/github.com/pkg/errors-0.9.1 njuettner/go_modules/github.com/prometheus/client_golang-1.5.1 normalize raffo-fix-2348 raffo/add-dependabot raffo/add-kustomize-base raffo/add-trivy-scanning raffo/arm raffo/arm32v7 raffo/bump-ci-timeout raffo/bump-cloudbuild-timeout raffo/bump-deps-sec raffo/bump-kustomize raffo/bump-kustomize-1 raffo/bump-kustomize-version-0.7.5 raffo/bump-modules raffo/codeQL raffo/drop-the-changelog raffo/e2e-aws raffo/edit-infoblox-maintainers raffo/fix-1820 raffo/fix-1936 raffo/fix-build raffo/fix-dependabot raffo/fix-ns-deletion raffo/fix-scaleway-security raffo/fix-that-typo raffo/fix-trivy raffo/fix-trivy-again raffo/fix-vulnerabilities raffo/goarm raffo/gpr-docker-image raffo/knolog raffo/kustomize-endpoints raffo/multiarch raffo/multiarch-docs raffo/new-ingress-resource raffo/new-maintainers raffo/provider-structure-refactor raffo/release-conventions raffo/release-note-patch raffo/release-script raffo/release-script-update raffo/release-v0.7.2 raffo/remove-azure-test raffo/remove-broken-link raffo/remove-incubator-readme raffo/remove-masters raffo/revert-tzdata raffo/split-sources raffo/update-kustomize-080 raffo/update-v0.10-role raffo/use-actions raffo/v0.7.6 revert-963-ns1-provider-ammended sagor999/infoblox-multiple-A-records test-things travis-test v0.5.15 v0.5.17 validate-txt-prefix v1.0.0-mf v0.10.2 v0.10.1 v0.10.0 v0.9.0 v0.8.0 v0.7.6 v0.7.5 v0.7.4 v0.7.3 v0.7.2 v0.7.1 v0.7.0 v0.6.0 v0.5.18 v0.5.17 v0.5.16 v0.5.15 v0.5.14 v0.5.13 v0.5.12 external-dns-helm-chart-1.7.1 external-dns-helm-chart-1.7.0 external-dns-helm-chart-1.6.0 external-dns-helm-chart-1.5.0 external-dns-helm-chart-1.4.1 external-dns-helm-chart-1.4.0 external-dns-helm-chart-1.3.2 external-dns-helm-chart-1.3.1 external-dns-helm-chart-1.3.0 external-dns-helm-chart-1.2.0
No related merge requests found
Showing with 160 additions and 34 deletions
+160 -34
......@@ -23,14 +23,17 @@ import (
"strings"
"text/template"
"github.com/kubernetes-incubator/external-dns/endpoint"
log "github.com/sirupsen/logrus"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
kubeinformers "k8s.io/client-go/informers"
extinformers "k8s.io/client-go/informers/extensions/v1beta1"
"k8s.io/client-go/kubernetes"
"github.com/kubernetes-incubator/external-dns/endpoint"
"k8s.io/client-go/tools/cache"
"time"
)
// ingressSource is an implementation of Source for Kubernetes ingress objects.
......@@ -44,6 +47,7 @@ type ingressSource struct {
fqdnTemplate *template.Template
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
ingressInformer extinformers.IngressInformer
}
// NewIngressSource creates a new ingressSource with the given config.
......@@ -61,31 +65,57 @@ func NewIngressSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}
}
return &ingressSource{
// Use shared informer to listen for add/update/delete of ingresses in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed.
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
ingressInformer := informerFactory.Extensions().V1beta1().Ingresses()
// Add default resource event handlers to properly initialize informer.
ingressInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
},
},
)
// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return ingressInformer.Informer().HasSynced() == true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
}
sc := &ingressSource{
client: kubeClient,
namespace: namespace,
annotationFilter: annotationFilter,
fqdnTemplate: tmpl,
combineFQDNAnnotation: combineFqdnAnnotation,
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
}, nil
ingressInformer: ingressInformer,
}
return sc, nil
}
// Endpoints returns endpoint objects for each host-target combination that should be processed.
// Retrieves all ingress resources on all namespaces
func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) {
ingresses, err := sc.client.Extensions().Ingresses(sc.namespace).List(metav1.ListOptions{})
ingresses, err := sc.ingressInformer.Lister().Ingresses(sc.namespace).List(labels.Everything())
if err != nil {
return nil, err
}
ingresses.Items, err = sc.filterByAnnotations(ingresses.Items)
ingresses, err = sc.filterByAnnotations(ingresses)
if err != nil {
return nil, err
}
endpoints := []*endpoint.Endpoint{}
for _, ing := range ingresses.Items {
for _, ing := range ingresses {
// Check controller annotation to see if we are responsible.
controller, ok := ing.Annotations[controllerAnnotationKey]
if ok && controller != controllerAnnotationValue {
......@@ -94,11 +124,11 @@ func (sc *ingressSource) Endpoints() ([]*endpoint.Endpoint, error) {
continue
}
ingEndpoints := endpointsFromIngress(&ing, sc.ignoreHostnameAnnotation)
ingEndpoints := endpointsFromIngress(ing, sc.ignoreHostnameAnnotation)
// apply template if host is missing on ingress
if (sc.combineFQDNAnnotation || len(ingEndpoints) == 0) && sc.fqdnTemplate != nil {
iEndpoints, err := sc.endpointsFromTemplate(&ing)
iEndpoints, err := sc.endpointsFromTemplate(ing)
if err != nil {
return nil, err
}
......@@ -161,7 +191,7 @@ func (sc *ingressSource) endpointsFromTemplate(ing *v1beta1.Ingress) ([]*endpoin
}
// filterByAnnotations filters a list of ingresses by a given annotation selector.
func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1beta1.Ingress, error) {
func (sc *ingressSource) filterByAnnotations(ingresses []*v1beta1.Ingress) ([]*v1beta1.Ingress, error) {
labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
if err != nil {
return nil, err
......@@ -176,7 +206,7 @@ func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1b
return ingresses, nil
}
filteredList := []v1beta1.Ingress{}
filteredList := []*v1beta1.Ingress{}
for _, ingress := range ingresses {
// convert the ingress' annotations to an equivalent label selector
......@@ -191,7 +221,7 @@ func (sc *ingressSource) filterByAnnotations(ingresses []v1beta1.Ingress) ([]v1b
return filteredList, nil
}
func (sc *ingressSource) setResourceLabel(ingress v1beta1.Ingress, endpoints []*endpoint.Endpoint) {
func (sc *ingressSource) setResourceLabel(ingress *v1beta1.Ingress, endpoints []*endpoint.Endpoint) {
for _, ep := range endpoints {
ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("ingress/%s/%s", ingress.Namespace, ingress.Name)
}
......
......@@ -18,10 +18,12 @@ package source
import (
"testing"
"time"
"k8s.io/api/core/v1"
"k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes/fake"
"github.com/kubernetes-incubator/external-dns/endpoint"
......@@ -990,7 +992,19 @@ func testIngressEndpoints(t *testing.T) {
require.NoError(t, err)
}
res, err := ingressSource.Endpoints()
var res []*endpoint.Endpoint
var err error
// wait up to a few seconds for new resources to appear in informer cache.
err = wait.Poll(time.Second, 3*time.Second, func() (bool, error) {
res, err = ingressSource.Endpoints()
if err != nil {
// stop waiting if we get an error
return true, err
}
return len(res) >= len(ti.expected), nil
})
if ti.expectError {
assert.Error(t, err)
} else {
......
......@@ -19,6 +19,9 @@ package source
import (
"bytes"
"fmt"
kubeinformers "k8s.io/client-go/informers"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
"sort"
"strings"
"text/template"
......@@ -29,9 +32,11 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
"github.com/kubernetes-incubator/external-dns/endpoint"
"time"
)
const (
......@@ -54,6 +59,9 @@ type serviceSource struct {
ignoreHostnameAnnotation bool
publishInternal bool
publishHostIP bool
serviceInformer coreinformers.ServiceInformer
podInformer coreinformers.PodInformer
nodeInformer coreinformers.NodeInformer
serviceTypeFilter map[string]struct{}
}
......@@ -72,6 +80,47 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
}
}
// Use shared informers to listen for add/update/delete of services/pods/nodes in the specified namespace.
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
podInformer := informerFactory.Core().V1().Pods()
nodeInformer := informerFactory.Core().V1().Nodes()
// Add default resource event handlers to properly initialize informer.
serviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("service added")
},
},
)
podInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("pod added")
},
},
)
nodeInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("node added")
},
},
)
// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return serviceInformer.Informer().HasSynced() == true, nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
}
// Transform the slice into a map so it will
// be way much easier and fast to filter later
serviceTypes := make(map[string]struct{})
......@@ -89,24 +138,27 @@ func NewServiceSource(kubeClient kubernetes.Interface, namespace, annotationFilt
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
publishInternal: publishInternal,
publishHostIP: publishHostIP,
serviceInformer: serviceInformer,
podInformer: podInformer,
nodeInformer: nodeInformer,
serviceTypeFilter: serviceTypes,
}, nil
}
// Endpoints returns endpoint objects for each service that should be processed.
func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
services, err := sc.client.CoreV1().Services(sc.namespace).List(metav1.ListOptions{})
services, err := sc.serviceInformer.Lister().Services(sc.namespace).List(labels.Everything())
if err != nil {
return nil, err
}
services.Items, err = sc.filterByAnnotations(services.Items)
services, err = sc.filterByAnnotations(services)
if err != nil {
return nil, err
}
// filter on service types if at least one has been provided
if len(sc.serviceTypeFilter) > 0 {
services.Items = sc.filterByServiceType(services.Items)
services = sc.filterByServiceType(services)
}
// get the ip addresses of all the nodes and cache them for this run
......@@ -117,7 +169,7 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
endpoints := []*endpoint.Endpoint{}
for _, svc := range services.Items {
for _, svc := range services {
// Check controller annotation to see if we are responsible.
controller, ok := svc.Annotations[controllerAnnotationKey]
if ok && controller != controllerAnnotationValue {
......@@ -126,16 +178,16 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
continue
}
svcEndpoints := sc.endpoints(&svc, nodeTargets)
svcEndpoints := sc.endpoints(svc, nodeTargets)
// process legacy annotations if no endpoints were returned and compatibility mode is enabled.
if len(svcEndpoints) == 0 && sc.compatibility != "" {
svcEndpoints = legacyEndpointsFromService(&svc, sc.compatibility)
svcEndpoints = legacyEndpointsFromService(svc, sc.compatibility)
}
// apply template if none of the above is found
if (sc.combineFQDNAnnotation || len(svcEndpoints) == 0) && sc.fqdnTemplate != nil {
sEndpoints, err := sc.endpointsFromTemplate(&svc, nodeTargets)
sEndpoints, err := sc.endpointsFromTemplate(svc, nodeTargets)
if err != nil {
return nil, err
}
......@@ -167,14 +219,23 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
func (sc *serviceSource) extractHeadlessEndpoints(svc *v1.Service, hostname string, ttl endpoint.TTL) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint
pods, err := sc.client.CoreV1().Pods(svc.Namespace).List(metav1.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()})
labelSelector, err := metav1.ParseToLabelSelector(labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String())
if err != nil {
return nil
}
selector, err := metav1.LabelSelectorAsSelector(labelSelector)
if err != nil {
return nil
}
pods, err := sc.podInformer.Lister().Pods(svc.Namespace).List(selector)
if err != nil {
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
return endpoints
}
targetsByHeadlessDomain := make(map[string][]string)
for _, v := range pods.Items {
for _, v := range pods {
headlessDomain := hostname
if v.Spec.Hostname != "" {
headlessDomain = v.Spec.Hostname + "." + headlessDomain
......@@ -251,7 +312,7 @@ func (sc *serviceSource) endpoints(svc *v1.Service, nodeTargets endpoint.Targets
}
// filterByAnnotations filters a list of services by a given annotation selector.
func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Service, error) {
func (sc *serviceSource) filterByAnnotations(services []*v1.Service) ([]*v1.Service, error) {
labelSelector, err := metav1.ParseToLabelSelector(sc.annotationFilter)
if err != nil {
return nil, err
......@@ -266,7 +327,7 @@ func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Servic
return services, nil
}
filteredList := []v1.Service{}
filteredList := []*v1.Service{}
for _, service := range services {
// convert the service's annotations to an equivalent label selector
......@@ -282,8 +343,8 @@ func (sc *serviceSource) filterByAnnotations(services []v1.Service) ([]v1.Servic
}
// filterByServiceType filters services according their types
func (sc *serviceSource) filterByServiceType(services []v1.Service) []v1.Service {
filteredList := []v1.Service{}
func (sc *serviceSource) filterByServiceType(services []*v1.Service) []*v1.Service {
filteredList := []*v1.Service{}
for _, service := range services {
// Check if the service is of the given type or not
if _, ok := sc.serviceTypeFilter[string(service.Spec.Type)]; ok {
......@@ -294,7 +355,7 @@ func (sc *serviceSource) filterByServiceType(services []v1.Service) []v1.Service
return filteredList
}
func (sc *serviceSource) setResourceLabel(service v1.Service, endpoints []*endpoint.Endpoint) {
func (sc *serviceSource) setResourceLabel(service *v1.Service, endpoints []*endpoint.Endpoint) {
for _, ep := range endpoints {
ep.Labels[endpoint.ResourceLabelKey] = fmt.Sprintf("service/%s/%s", service.Namespace, service.Name)
}
......@@ -392,7 +453,7 @@ func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) {
externalIPs endpoint.Targets
)
nodes, err := sc.client.CoreV1().Nodes().List(metav1.ListOptions{})
nodes, err := sc.nodeInformer.Lister().List(labels.Everything())
if err != nil {
if errors.IsForbidden(err) {
// Return an empty list because it makes sense to continue and try other sources.
......@@ -402,7 +463,7 @@ func (sc *serviceSource) extractNodeTargets() (endpoint.Targets, error) {
return nil, err
}
for _, node := range nodes.Items {
for _, node := range nodes {
for _, address := range node.Status.Addresses {
switch address.Type {
case v1.NodeExternalIP:
......
......@@ -17,8 +17,10 @@ limitations under the License.
package source
import (
"k8s.io/apimachinery/pkg/util/wait"
"net"
"testing"
"time"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
......@@ -1087,7 +1089,18 @@ func testServiceSourceEndpoints(t *testing.T) {
)
require.NoError(t, err)
endpoints, err := client.Endpoints()
var res []*endpoint.Endpoint
// wait up to a few seconds for new resources to appear in informer cache.
err = wait.Poll(time.Second, 3*time.Second, func() (bool, error) {
res, err = client.Endpoints()
if err != nil {
// stop waiting if we get an error
return true, err
}
return len(res) >= len(tc.expected), nil
})
if tc.expectError {
require.Error(t, err)
} else {
......@@ -1095,7 +1108,7 @@ func testServiceSourceEndpoints(t *testing.T) {
}
// Validate returned endpoints against desired endpoints.
validateEndpoints(t, endpoints, tc.expected)
validateEndpoints(t, res, tc.expected)
})
}
}
......
......@@ -17,9 +17,10 @@ limitations under the License.
package source
import (
"testing"
"github.com/kubernetes-incubator/external-dns/endpoint"
"sort"
"strings"
"testing"
)
// test helper functions
......@@ -28,6 +29,13 @@ func validateEndpoints(t *testing.T, endpoints, expected []*endpoint.Endpoint) {
if len(endpoints) != len(expected) {
t.Fatalf("expected %d endpoints, got %d", len(expected), len(endpoints))
}
// Make sure endpoints are sorted - validateEndpoint() depends on it.
sort.SliceStable(endpoints, func(i, j int) bool {
return strings.Compare(endpoints[i].DNSName, endpoints[j].DNSName) < 0
})
sort.SliceStable(expected, func(i, j int) bool {
return strings.Compare(expected[i].DNSName, expected[j].DNSName) < 0
})
for i := range endpoints {
validateEndpoint(t, endpoints[i], expected[i])
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment