Unverified Commit 11d27968 authored by Kubernetes Prow Robot's avatar Kubernetes Prow Robot Committed by GitHub
Browse files

Merge pull request #1728 from tariq1890/istio-informers

add event handler for istio gateway and virtualservice source
parents 306b3377 58ceebec
master Raffo-patch-1 correctly-update-aws-records-when-type-changes dansimone/support-prefer-ingress-annotations dependabot/go_modules/github.com/Azure/azure-sdk-for-go-61.4.0incompatible dependabot/go_modules/github.com/aliyun/alibaba-cloud-sdk-go-1.61.1473 dependabot/go_modules/github.com/exoscale/egoscale-1.19.0 dependabot/go_modules/github.com/projectcontour/contour-1.20.0 dependabot/go_modules/k8s.io/apimachinery-0.23.3 fix-1820 gh-pages infoblox-multiple-A-records-fix normalize raffo-fix-2348 raffo/add-dependabot raffo/add-trivy-scanning raffo/arm raffo/arm32v7 raffo/bump-ci-timeout raffo/bump-cloudbuild-timeout raffo/bump-deps-sec raffo/bump-kustomize raffo/bump-kustomize-1 raffo/bump-kustomize-version-0.7.5 raffo/bump-modules raffo/codeQL raffo/drop-the-changelog raffo/e2e-aws raffo/fix-1820 raffo/fix-1936 raffo/fix-build raffo/fix-dependabot raffo/fix-ns-deletion raffo/fix-scaleway-security raffo/fix-that-typo raffo/fix-trivy raffo/fix-trivy-again raffo/fix-vulnerabilities raffo/goarm raffo/knolog raffo/multiarch raffo/multiarch-docs raffo/new-ingress-resource raffo/release-conventions raffo/release-note-patch raffo/release-script raffo/release-script-update raffo/remove-azure-test raffo/remove-broken-link raffo/revert-tzdata raffo/update-kustomize-080 raffo/update-v0.10-role raffo/v0.7.6 validate-txt-prefix v1.0.0-mf v0.10.2 v0.10.1 v0.10.0 v0.9.0 v0.8.0 v0.7.6 v0.7.5 v0.7.4 external-dns-helm-chart-1.7.1 external-dns-helm-chart-1.7.0 external-dns-helm-chart-1.6.0 external-dns-helm-chart-1.5.0 external-dns-helm-chart-1.4.1 external-dns-helm-chart-1.4.0 external-dns-helm-chart-1.3.2 external-dns-helm-chart-1.3.1 external-dns-helm-chart-1.3.0 external-dns-helm-chart-1.2.0
No related merge requests found
Showing with 75 additions and 2 deletions
+75 -2
......@@ -28,6 +28,8 @@ import (
log "github.com/sirupsen/logrus"
networkingv1alpha3 "istio.io/client-go/pkg/apis/networking/v1alpha3"
istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
......@@ -51,6 +53,7 @@ type gatewaySource struct {
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
serviceInformer coreinformers.ServiceInformer
gatewayInformer networkingv1alpha3informer.GatewayInformer
}
// NewIstioGatewaySource creates a new gatewaySource with the given config.
......@@ -81,6 +84,8 @@ func NewIstioGatewaySource(
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
istioInformerFactory := istioinformers.NewSharedInformerFactory(istioClient, 0)
gatewayInformer := istioInformerFactory.Networking().V1alpha3().Gateways()
// Add default resource event handlers to properly initialize informer.
serviceInformer.Informer().AddEventHandler(
......@@ -91,8 +96,17 @@ func NewIstioGatewaySource(
},
)
gatewayInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("gateway added")
},
},
)
// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
istioInformerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated.
err = poll(time.Second, 60*time.Second, func() (bool, error) {
......@@ -102,6 +116,14 @@ func NewIstioGatewaySource(
return nil, fmt.Errorf("failed to sync cache: %v", err)
}
// wait for the local cache to be populated.
err = poll(time.Second, 60*time.Second, func() (bool, error) {
return gatewayInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
}
return &gatewaySource{
kubeClient: kubeClient,
istioClient: istioClient,
......@@ -111,6 +133,7 @@ func NewIstioGatewaySource(
combineFQDNAnnotation: combineFQDNAnnotation,
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
serviceInformer: serviceInformer,
gatewayInformer: gatewayInformer,
}, nil
}
......@@ -180,9 +203,23 @@ func (sc *gatewaySource) Endpoints(ctx context.Context) ([]*endpoint.Endpoint, e
return endpoints, nil
}
// TODO(tariq1890): Implement this once we have evaluated and tested GatewayInformers
// AddEventHandler adds an event handler that should be triggered if the watched Istio Gateway changes.
func (sc *gatewaySource) AddEventHandler(ctx context.Context, handler func()) {
log.Debug("Adding event handler for Istio Gateway")
sc.gatewayInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler()
},
UpdateFunc: func(old interface{}, new interface{}) {
handler()
},
DeleteFunc: func(obj interface{}) {
handler()
},
},
)
}
// filterByAnnotations filters a list of configs by a given annotation selector.
......
......@@ -29,6 +29,8 @@ import (
log "github.com/sirupsen/logrus"
istioclient "istio.io/client-go/pkg/clientset/versioned"
istioinformers "istio.io/client-go/pkg/informers/externalversions"
networkingv1alpha3informer "istio.io/client-go/pkg/informers/externalversions/networking/v1alpha3"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/wait"
......@@ -55,6 +57,7 @@ type virtualServiceSource struct {
combineFQDNAnnotation bool
ignoreHostnameAnnotation bool
serviceInformer coreinformers.ServiceInformer
virtualserviceInformer networkingv1alpha3informer.VirtualServiceInformer
}
// NewIstioVirtualServiceSource creates a new virtualServiceSource with the given config.
......@@ -85,6 +88,8 @@ func NewIstioVirtualServiceSource(
// Set resync period to 0, to prevent processing when nothing has changed
informerFactory := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 0, kubeinformers.WithNamespace(namespace))
serviceInformer := informerFactory.Core().V1().Services()
istioInformerFactory := istioinformers.NewSharedInformerFactory(istioClient, 0)
virtualServiceInformer := istioInformerFactory.Networking().V1alpha3().VirtualServices()
// Add default resource event handlers to properly initialize informer.
serviceInformer.Informer().AddEventHandler(
......@@ -95,8 +100,17 @@ func NewIstioVirtualServiceSource(
},
)
virtualServiceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
log.Debug("virtual service added")
},
},
)
// TODO informer is not explicitly stopped since controller is not passing in its channel.
informerFactory.Start(wait.NeverStop)
istioInformerFactory.Start(wait.NeverStop)
// wait for the local cache to be populated.
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
......@@ -106,6 +120,13 @@ func NewIstioVirtualServiceSource(
return nil, fmt.Errorf("failed to sync cache: %v", err)
}
err = wait.Poll(time.Second, 60*time.Second, func() (bool, error) {
return virtualServiceInformer.Informer().HasSynced(), nil
})
if err != nil {
return nil, fmt.Errorf("failed to sync cache: %v", err)
}
return &virtualServiceSource{
kubeClient: kubeClient,
istioClient: istioClient,
......@@ -115,6 +136,7 @@ func NewIstioVirtualServiceSource(
combineFQDNAnnotation: combineFQDNAnnotation,
ignoreHostnameAnnotation: ignoreHostnameAnnotation,
serviceInformer: serviceInformer,
virtualserviceInformer: virtualServiceInformer,
}, nil
}
......@@ -179,9 +201,23 @@ func (sc *virtualServiceSource) Endpoints(ctx context.Context) ([]*endpoint.Endp
return endpoints, nil
}
// TODO(tariq1890): Implement this once we have evaluated and tested VirtualServiceInformers
// AddEventHandler adds an event handler that should be triggered if the watched Istio VirtualService changes.
func (sc *virtualServiceSource) AddEventHandler(ctx context.Context, handler func()) {
log.Debug("Adding event handler for Istio VirtualService")
sc.virtualserviceInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
handler()
},
UpdateFunc: func(old interface{}, new interface{}) {
handler()
},
DeleteFunc: func(obj interface{}) {
handler()
},
},
)
}
func (sc *virtualServiceSource) getGateway(ctx context.Context, gatewayStr string, virtualService networkingv1alpha3.VirtualService) *networkingv1alpha3.Gateway {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment