44 "context"
55 "crypto/sha256"
66 "encoding/base32"
7+ "fmt"
78 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/api/v1alpha1"
89 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/cloudmap"
910 "github.com/aws/aws-cloud-map-mcs-controller-for-k8s/pkg/model"
@@ -24,6 +25,8 @@ const (
2425 // TODO move to configuration
2526 syncPeriod = 2 * time .Second
2627
28+ maxEndpointsPerSlice = 100
29+
2730 // DerivedServiceAnnotation annotates a ServiceImport with derived Service name
2831 DerivedServiceAnnotation = "multicluster.k8s.aws/derived-service"
2932
@@ -156,7 +159,7 @@ func (r *CloudMapReconciler) reconcileService(ctx context.Context, svc *model.Se
156159 return err
157160 }
158161
159- err = r .updateEndpointSlices (ctx , svcImport , svc , derivedService )
162+ err = r .updateEndpointSlices (ctx , svcImport , svc . Endpoints , derivedService )
160163 if err != nil {
161164 return err
162165 }
@@ -199,7 +202,7 @@ func (r *CloudMapReconciler) getDerivedService(ctx context.Context, namespace st
199202}
200203
201204func (r * CloudMapReconciler ) createAndGetDerivedService (ctx context.Context , svc * model.Service , svcImport * v1alpha1.ServiceImport ) (* v1.Service , error ) {
202- toCreate := createDerivedServiceStruct (svc , svcImport )
205+ toCreate := createDerivedServiceStruct (svc . Endpoints , svcImport )
203206 if err := r .Client .Create (ctx , toCreate ); err != nil {
204207 return nil , err
205208 }
@@ -208,25 +211,100 @@ func (r *CloudMapReconciler) createAndGetDerivedService(ctx context.Context, svc
208211 return r .getDerivedService (ctx , svc .Namespace , svcImport .Annotations [DerivedServiceAnnotation ])
209212}
210213
211- func (r * CloudMapReconciler ) updateEndpointSlices (ctx context.Context , svcImport * v1alpha1.ServiceImport , cloudMapService * model.Service , svc * v1.Service ) error {
214+ func (r * CloudMapReconciler ) updateEndpointSlices (ctx context.Context , svcImport * v1alpha1.ServiceImport , desiredEndpoints [] * model.Endpoint , svc * v1.Service ) error {
212215 existingSlicesList := discovery.EndpointSliceList {}
213- var existingSlices []* discovery.EndpointSlice
214216 if err := r .Client .List (ctx , & existingSlicesList ,
215217 client .InNamespace (svc .Namespace ), client.MatchingLabels {discovery .LabelServiceName : svc .Name }); err != nil {
216218 return err
217219 }
218- if len (existingSlicesList .Items ) == 0 {
219- // create new endpoint slice
220- existingSlices = createEndpointSlicesStruct (svcImport , cloudMapService , svc )
221- for _ , slice := range existingSlices {
222- if err := r .Client .Create (ctx , slice ); err != nil {
223- return err
220+
221+ desiredPorts := extractEndpointPorts (desiredEndpoints )
222+ matchedEndpoints := make (map [string ]* discovery.Endpoint )
223+ endpointsToCreate := make ([]discovery.Endpoint , 0 )
224+
225+ // populate map of existing endpoints in slices for lookup efficiency
226+ existingEndpointMap := make (map [string ]* discovery.Endpoint )
227+ for _ , existingSlice := range existingSlicesList .Items {
228+ for _ , existingEndpoint := range existingSlice .Endpoints {
229+ ref := existingEndpoint
230+ existingEndpointMap [ref .Addresses [0 ]] = & ref
231+ }
232+ }
233+
234+ // check if all desired endpoints are in an endpoint slice already
235+ for _ , desiredEndpoint := range desiredEndpoints {
236+ match , exists := existingEndpointMap [desiredEndpoint .IP ]
237+ if exists {
238+ matchedEndpoints [desiredEndpoint .IP ] = match
239+ } else {
240+ endpointsToCreate = append (endpointsToCreate , createEndpointForSlice (svc , desiredEndpoint .IP ))
241+ }
242+ }
243+
244+ // check if all endpoints in slices match a desired endpoint,
245+ for _ , existingSlice := range existingSlicesList .Items {
246+ updatedEndpointList := make ([]discovery.Endpoint , 0 )
247+ for _ , existingEndpoint := range existingSlice .Endpoints {
248+ keep , found := matchedEndpoints [existingEndpoint .Addresses [0 ]]
249+ if found {
250+ updatedEndpointList = append (updatedEndpointList , * keep )
251+ }
252+ }
253+
254+ endpointSliceNeedsUpdate := len (existingSlice .Endpoints ) != len (updatedEndpointList )
255+
256+ // fill endpoint slice with endpoints to create if necessary and there is sufficient room
257+ for _ , endpointToCreate := range endpointsToCreate {
258+ if len (updatedEndpointList ) >= maxEndpointsPerSlice {
259+ break
260+ }
261+ endpointSliceNeedsUpdate = true
262+ updatedEndpointList = append (updatedEndpointList , endpointToCreate )
263+ endpointsToCreate = endpointsToCreate [1 :]
264+ }
265+
266+ sliceToUpdate := existingSlice
267+ sliceToUpdate .Endpoints = updatedEndpointList
268+
269+ // delete empty endpoint slice
270+ if len (updatedEndpointList ) == 0 {
271+ r .Logger .Info ("deleting EndpointSlice" , "namespace" , sliceToUpdate .Namespace , "name" , sliceToUpdate .Name )
272+ if err := r .Client .Delete (ctx , & sliceToUpdate ); err != nil {
273+ return fmt .Errorf ("failed to delete EndpointSlice: %w" , err )
274+ }
275+ continue
276+ }
277+
278+ // needsUpdate = true if ports don't match
279+ if ! EndpointPortsAreEqualIgnoreOrder (desiredPorts , sliceToUpdate .Ports ) {
280+ sliceToUpdate .Ports = desiredPorts
281+ endpointSliceNeedsUpdate = true
282+ }
283+
284+ if endpointSliceNeedsUpdate {
285+ r .Logger .Info ("updating EndpointSlice" , "namespace" , sliceToUpdate .Namespace , "name" , sliceToUpdate .Name )
286+ if err := r .Client .Update (ctx , & sliceToUpdate ); err != nil {
287+ return fmt .Errorf ("failed to update EndpointSlice: %w" , err )
224288 }
225- r .Logger .Info ("created EndpointSlice" , "namespace" , slice .Namespace , "name" , slice .Name )
226289 }
227290 }
228291
229- // TODO check existing slices match Cloud Map endpoints
292+ slicesToCreate := make ([]* discovery.EndpointSlice , 0 )
293+ for len (endpointsToCreate ) > maxEndpointsPerSlice {
294+ slicesToCreate = append (slicesToCreate , createEndpointSliceStruct (svcImport , svc , endpointsToCreate [0 :maxEndpointsPerSlice ], desiredPorts ))
295+ endpointsToCreate = endpointsToCreate [maxEndpointsPerSlice :]
296+ }
297+
298+ if len (endpointsToCreate ) != 0 {
299+ slicesToCreate = append (slicesToCreate , createEndpointSliceStruct (svcImport , svc , endpointsToCreate , desiredPorts ))
300+ }
301+
302+ for _ , newSlice := range slicesToCreate {
303+ r .Logger .Info ("creating EndpointSlice" , "namespace" , newSlice .Namespace )
304+ if err := r .Client .Create (ctx , newSlice ); err != nil {
305+ return fmt .Errorf ("failed to create EndpointSlice: %w" , err )
306+ }
307+ }
230308
231309 return nil
232310}
@@ -238,7 +316,7 @@ func DerivedName(namespace string, name string) string {
238316 return "imported-" + strings .ToLower (base32 .HexEncoding .WithPadding (base32 .NoPadding ).EncodeToString (hash .Sum (nil )))[:10 ]
239317}
240318
241- func createDerivedServiceStruct (svc * model.Service , svcImport * v1alpha1.ServiceImport ) * v1.Service {
319+ func createDerivedServiceStruct (endpoints [] * model.Endpoint , svcImport * v1alpha1.ServiceImport ) * v1.Service {
242320 ownerRef := metav1 .NewControllerRef (svcImport , schema.GroupVersionKind {
243321 Version : svcImport .TypeMeta .APIVersion ,
244322 Kind : svcImport .TypeMeta .Kind ,
@@ -252,37 +330,31 @@ func createDerivedServiceStruct(svc *model.Service, svcImport *v1alpha1.ServiceI
252330 },
253331 Spec : v1.ServiceSpec {
254332 Type : v1 .ServiceTypeClusterIP ,
255- Ports : extractServicePorts (svc ),
333+ Ports : extractServicePorts (endpoints ),
256334 },
257335 }
258336}
259337
260- func createEndpointSlicesStruct (svcImport * v1alpha1.ServiceImport , cloudMapSvc * model.Service , svc * v1.Service ) []* discovery.EndpointSlice {
261- slices := make ([]* discovery.EndpointSlice , 0 )
262-
338+ func createEndpointForSlice (svc * v1.Service , ip string ) discovery.Endpoint {
263339 t := true
264340
265- endpoints := make ([]discovery.Endpoint , 0 )
266- for _ , ep := range cloudMapSvc .Endpoints {
267- endpoints = append (endpoints , discovery.Endpoint {
268- Addresses : []string {ep .IP },
269- Conditions : discovery.EndpointConditions {
270- Ready : & t ,
271- },
272- TargetRef : & v1.ObjectReference {
273- Kind : "Service" ,
274- Namespace : svc .Namespace ,
275- Name : svc .Name ,
276- UID : svc .ObjectMeta .UID ,
277- ResourceVersion : svc .ObjectMeta .ResourceVersion ,
278- },
279- })
341+ return discovery.Endpoint {
342+ Addresses : []string {ip },
343+ Conditions : discovery.EndpointConditions {
344+ Ready : & t ,
345+ },
346+ TargetRef : & v1.ObjectReference {
347+ Kind : "Service" ,
348+ Namespace : svc .Namespace ,
349+ Name : svc .Name ,
350+ UID : svc .ObjectMeta .UID ,
351+ ResourceVersion : svc .ObjectMeta .ResourceVersion ,
352+ },
280353 }
354+ }
281355
282- // TODO split slices in case there are more than 1000 endpoints
283- // see https://github.com/kubernetes/enhancements/blob/master/keps/sig-network/0752-endpointslices/README.md
284-
285- slices = append (slices , & discovery.EndpointSlice {
356+ func createEndpointSliceStruct (svcImport * v1alpha1.ServiceImport , svc * v1.Service , endpoints []discovery.Endpoint , ports []discovery.EndpointPort ) * discovery.EndpointSlice {
357+ return & discovery.EndpointSlice {
286358 ObjectMeta : metav1.ObjectMeta {
287359 Labels : map [string ]string {
288360 discovery .LabelServiceName : svc .Name , // derived Service name
@@ -297,15 +369,13 @@ func createEndpointSlicesStruct(svcImport *v1alpha1.ServiceImport, cloudMapSvc *
297369 },
298370 AddressType : discovery .AddressTypeIPv4 ,
299371 Endpoints : endpoints ,
300- Ports : extractEndpointPorts (cloudMapSvc ),
301- })
302-
303- return slices
372+ Ports : ports ,
373+ }
304374}
305375
306- func extractServicePorts (svc * model.Service ) []v1.ServicePort {
376+ func extractServicePorts (endpoints [] * model.Endpoint ) []v1.ServicePort {
307377 uniquePorts := make (map [string ]model.Port )
308- for _ , ep := range svc . Endpoints {
378+ for _ , ep := range endpoints {
309379 uniquePorts [ep .ServicePort .GetID ()] = ep .ServicePort
310380 }
311381
@@ -317,16 +387,17 @@ func extractServicePorts(svc *model.Service) []v1.ServicePort {
317387 return servicePorts
318388}
319389
320- func extractEndpointPorts (svc * model.Service ) []discovery.EndpointPort {
390+ func extractEndpointPorts (endpoints [] * model.Endpoint ) []discovery.EndpointPort {
321391 uniquePorts := make (map [string ]model.Port )
322- for _ , ep := range svc . Endpoints {
392+ for _ , ep := range endpoints {
323393 uniquePorts [ep .EndpointPort .GetID ()] = ep .EndpointPort
324394 }
325395
326396 endpointPorts := make ([]discovery.EndpointPort , 0 , len (uniquePorts ))
327397 for _ , endpointPort := range uniquePorts {
328398 endpointPorts = append (endpointPorts , PortToEndpointPort (endpointPort ))
329399 }
400+
330401 return endpointPorts
331402}
332403
@@ -356,6 +427,7 @@ func portsEqual(svcImport *v1alpha1.ServiceImport, svc *v1.Service) bool {
356427 svcPorts = append (svcPorts , servicePortToServiceImport (p ))
357428 }
358429
430+ // TODO: consider order
359431 return reflect .DeepEqual (impPorts , svcPorts )
360432}
361433
0 commit comments