Commit cbc539f8 authored by Arttii's avatar Arttii Committed by Henning Jacobs
Browse files

Add support for Headless hostPort services (#324)

* Added initial support for Headless services

* service.go: Fixed some formatting

* forgot to run gometalinter

* service: implemented alternative proposal, to reuse existing annotation

* source: some refactoring
parent eb51f7f5
Showing with 167 additions and 0 deletions
+167 -0
File added
......@@ -121,6 +121,30 @@ func (sc *serviceSource) Endpoints() ([]*endpoint.Endpoint, error) {
return endpoints, nil
}
func (sc *serviceSource) extractHeadlessEndpoint(svc *v1.Service, hostname string) []*endpoint.Endpoint {
var endpoints []*endpoint.Endpoint
pods, err := sc.client.CoreV1().Pods(svc.Namespace).List(metav1.ListOptions{LabelSelector: labels.Set(svc.Spec.Selector).AsSelectorPreValidated().String()})
if err != nil {
log.Errorf("List Pods of service[%s] error:%v", svc.GetName(), err)
return endpoints
}
for _, v := range pods.Items {
headlessDomain := v.Spec.Hostname + "." + hostname
log.Debugf("Generating matching endpoint %s with HostIP %s", headlessDomain, v.Status.HostIP)
// To reduce traffice on the DNS API only add record for running Pods. Good Idea?
if v.Status.Phase == v1.PodRunning {
endpoints = append(endpoints, endpoint.NewEndpoint(headlessDomain, v.Status.HostIP, endpoint.RecordTypeA))
} else {
log.Debugf("Pod %s is not in running phase", v.Spec.Hostname)
}
}
return endpoints
}
func (sc *serviceSource) endpointsFromTemplate(svc *v1.Service) ([]*endpoint.Endpoint, error) {
var endpoints []*endpoint.Endpoint
......@@ -197,6 +221,10 @@ func (sc *serviceSource) generateEndpoints(svc *v1.Service, hostname string) []*
if sc.publishInternal {
endpoints = append(endpoints, extractServiceIps(svc, hostname)...)
}
if svc.Spec.ClusterIP == v1.ClusterIPNone {
endpoints = append(endpoints, sc.extractHeadlessEndpoint(svc, hostname)...)
}
}
return endpoints
}
......
......@@ -891,6 +891,145 @@ func TestClusterIpServices(t *testing.T) {
}
}
// TestHeadlessServices tests that headless services generate the correct endpoints.
func TestHeadlessServices(t *testing.T) {
for _, tc := range []struct {
title string
targetNamespace string
svcNamespace string
svcName string
svcType v1.ServiceType
compatibility string
fqdnTemplate string
labels map[string]string
annotations map[string]string
clusterIP string
hostIP string
selector map[string]string
lbs []string
hostnames []string
phases []v1.PodPhase
expected []*endpoint.Endpoint
expectError bool
}{
{
"annotated Headless services return endpoints for each selected Pod",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
"1.1.1.1",
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodRunning},
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Target: "1.1.1.1"},
{DNSName: "foo-1.service.example.org", Target: "1.1.1.1"},
},
false,
},
{
"annotated Headless services return endpoints for each selected Pod, which are in running state",
"",
"testing",
"foo",
v1.ServiceTypeClusterIP,
"",
"",
map[string]string{"component": "foo"},
map[string]string{
hostnameAnnotationKey: "service.example.org",
},
v1.ClusterIPNone,
"1.1.1.1",
map[string]string{
"component": "foo",
},
[]string{},
[]string{"foo-0", "foo-1"},
[]v1.PodPhase{v1.PodRunning, v1.PodFailed},
[]*endpoint.Endpoint{
{DNSName: "foo-0.service.example.org", Target: "1.1.1.1"},
},
false,
},
} {
t.Run(tc.title, func(t *testing.T) {
// Create a Kubernetes testing client
kubernetes := fake.NewSimpleClientset()
service := &v1.Service{
Spec: v1.ServiceSpec{
Type: tc.svcType,
ClusterIP: tc.clusterIP,
Selector: tc.selector,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: tc.svcName,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.ServiceStatus{},
}
_, err := kubernetes.CoreV1().Services(service.Namespace).Create(service)
require.NoError(t, err)
for i, hostname := range tc.hostnames {
pod := &v1.Pod{
Spec: v1.PodSpec{
Containers: []v1.Container{},
Hostname: hostname,
},
ObjectMeta: metav1.ObjectMeta{
Namespace: tc.svcNamespace,
Name: hostname,
Labels: tc.labels,
Annotations: tc.annotations,
},
Status: v1.PodStatus{
HostIP: tc.hostIP,
Phase: tc.phases[i],
},
}
_, err = kubernetes.CoreV1().Pods(tc.svcNamespace).Create(pod)
require.NoError(t, err)
}
// Create our object under test and get the endpoints.
client, _ := NewServiceSource(
kubernetes,
tc.targetNamespace,
tc.fqdnTemplate,
tc.compatibility,
true,
)
require.NoError(t, err)
endpoints, err := client.Endpoints()
if tc.expectError {
require.Error(t, err)
} else {
require.NoError(t, err)
}
// Validate returned endpoints against desired endpoints.
validateEndpoints(t, endpoints, tc.expected)
})
}
}
func BenchmarkServiceEndpoints(b *testing.B) {
kubernetes := fake.NewSimpleClientset()
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment