Skip to content

Commit f034ec7

Browse files
Implement Prioriy and Serial settings for discovered handlers
Signed-off-by: Danil-Grigorev <[email protected]>
1 parent 8863bd1 commit f034ec7

File tree

14 files changed

+237
-13
lines changed

14 files changed

+237
-13
lines changed

config/crd/bases/runtime.cluster.x-k8s.io_extensionconfigs.yaml

+9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exp/runtime/api/v1alpha1/extensionconfig_types.go

+8
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,14 @@ type ExtensionHandler struct {
131131
// Defaults to Fail if not set.
132132
// +optional
133133
FailurePolicy *FailurePolicy `json:"failurePolicy,omitempty"`
134+
135+
// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
136+
// +optional
137+
Priority int32 `json:"priority,omitempty"`
138+
139+
// Serial defines if the blocked hook is allowed to run in parallel with others.
140+
// +optional
141+
Serial bool `json:"serial,omitempty"`
134142
}
135143

136144
// GroupVersionHook defines the runtime hook when the ExtensionHandler is called.

exp/runtime/hooks/api/v1alpha1/discovery_types.go

+8
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ type ExtensionHandler struct {
6363
// failurePolicy defines how failures in calls to the ExtensionHandler should be handled by a client.
6464
// This is defaulted to FailurePolicyFail if not defined.
6565
FailurePolicy *FailurePolicy `json:"failurePolicy,omitempty"`
66+
67+
// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
68+
// +optional
69+
Priority int32 `json:"priority,omitempty"`
70+
71+
// Serial defines if the blocked hook is allowed to run in parallel with others.
72+
// +optional
73+
Serial bool `json:"serial,omitempty"`
6674
}
6775

6876
// GroupVersionHook defines the runtime hook when the ExtensionHandler is called.

exp/runtime/hooks/api/v1alpha1/zz_generated.openapi.go

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

exp/runtime/internal/controllers/extensionconfig_controller_test.go

+6
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@ func TestExtensionReconciler_discoverExtensionConfig(t *testing.T) {
229229
handlers := discoveredExtensionConfig.Status.Handlers
230230
g.Expect(handlers).To(HaveLen(1))
231231
g.Expect(handlers[0].Name).To(Equal("first.ext1"))
232+
g.Expect(handlers[0].RequestHook.Hook).To(Equal("FakeHook"))
233+
g.Expect(handlers[0].RequestHook.APIVersion).To(Equal("test.runtime.cluster.x-k8s.io/v1alpha1"))
234+
g.Expect(handlers[0].Serial).To(BeTrue())
235+
g.Expect(handlers[0].Priority).To(Equal(int32(100)))
232236

233237
// Expect exactly one condition and expect the condition to have type RuntimeExtensionDiscoveredCondition and
234238
// Status true.
@@ -345,6 +349,8 @@ func discoveryHandler(handlerList ...string) func(http.ResponseWriter, *http.Req
345349
Hook: "FakeHook",
346350
APIVersion: fakev1alpha1.GroupVersion.String(),
347351
},
352+
Serial: true,
353+
Priority: 100,
348354
})
349355
}
350356
response := &runtimehooksv1.DiscoveryResponse{

exp/runtime/server/server.go

+8
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,12 @@ type ExtensionHandler struct {
150150
// If left undefined, this will be defaulted to FailurePolicyFail when processing the answer to the discovery
151151
// call for this server.
152152
FailurePolicy *runtimehooksv1.FailurePolicy
153+
154+
// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
155+
Priority int32
156+
157+
// Serial defines if the blocked hook is allowed to run in parallel with others.
158+
Serial bool
153159
}
154160

155161
// AddExtensionHandler adds an extension handler to the server.
@@ -268,6 +274,8 @@ func discoveryHandler(handlers map[string]ExtensionHandler) func(context.Context
268274
},
269275
TimeoutSeconds: handler.TimeoutSeconds,
270276
FailurePolicy: handler.FailurePolicy,
277+
Priority: handler.Priority,
278+
Serial: handler.Serial,
271279
})
272280
}
273281

internal/runtime/client/client.go

+18
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ func (c *client) Discover(ctx context.Context, extensionConfig *runtimev1.Extens
143143
},
144144
TimeoutSeconds: handler.TimeoutSeconds,
145145
FailurePolicy: (*runtimev1.FailurePolicy)(handler.FailurePolicy),
146+
Priority: handler.Priority,
147+
Serial: handler.Serial,
146148
},
147149
)
148150
}
@@ -193,6 +195,7 @@ func (c *client) CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook
193195

194196
log.V(4).Info(fmt.Sprintf("Calling all extensions of hook %q", hookName))
195197
responses := []runtimehooksv1.ResponseObject{}
198+
retry := false
196199
for _, registration := range registrations {
197200
// Creates a new instance of the response parameter.
198201
responseObject, err := c.catalog.NewResponse(gvh)
@@ -212,13 +215,28 @@ func (c *client) CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook
212215
continue
213216
}
214217

218+
if registration.Serial && retry {
219+
log.V(5).Info(fmt.Sprintf("Serial handler %q waits for blocking response to complete", registration.Name))
220+
break
221+
}
222+
215223
err = c.CallExtension(ctx, hook, forObject, registration.Name, request, tmpResponse)
216224
// If one of the extension handlers fails lets short-circuit here and return early.
217225
if err != nil {
218226
log.Error(err, "failed to call extension handlers")
219227
return errors.Wrapf(err, "failed to call extension handlers for hook %q", gvh.GroupHook())
220228
}
229+
230+
if retryResponse, isRetry := tmpResponse.(runtimehooksv1.RetryResponseObject); isRetry && !retry && retryResponse.GetRetryAfterSeconds() > 0 {
231+
retry = isRetry
232+
}
233+
221234
responses = append(responses, tmpResponse)
235+
236+
if registration.Serial && retry {
237+
log.V(5).Info(fmt.Sprintf("Serial handler %q is blocking hook until it is completed", registration.Name))
238+
break
239+
}
222240
}
223241

224242
// Aggregate all responses into a single response.

internal/runtime/client/client_test.go

+78-1
Original file line numberDiff line numberDiff line change
@@ -984,6 +984,17 @@ func TestClient_CallAllExtensions(t *testing.T) {
984984
},
985985
}
986986

987+
secondBlockingConfig := extensionConfig.DeepCopy()
988+
secondBlockingConfig.Status.Handlers[0].Priority = 2
989+
secondBlockingConfig.Status.Handlers[1].Priority = 1
990+
secondBlockingConfig.Status.Handlers[1].Serial = true
991+
secondBlockingConfig.Status.Handlers[0].RequestHook.Hook = "RetryableFakeHook"
992+
secondBlockingConfig.Status.Handlers[1].RequestHook.Hook = "RetryableFakeHook"
993+
secondBlockingConfig.Status.Handlers[2].RequestHook.Hook = "RetryableFakeHook"
994+
995+
secondBlockingWithPriorityConfig := secondBlockingConfig.DeepCopy()
996+
secondBlockingWithPriorityConfig.Status.Handlers[1].Priority = 3
997+
987998
type args struct {
988999
hook runtimecatalog.Hook
9891000
request runtimehooksv1.RequestObject
@@ -1072,6 +1083,55 @@ func TestClient_CallAllExtensions(t *testing.T) {
10721083
},
10731084
wantErr: true,
10741085
},
1086+
{
1087+
name: "should succeed and wait on previous blocking responses for serial handler",
1088+
registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingConfig},
1089+
testServer: testServerConfig{
1090+
start: true,
1091+
responses: map[string]testServerResponse{
1092+
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/first-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1),
1093+
// second and third extension has no handler.
1094+
},
1095+
},
1096+
args: args{
1097+
hook: fakev1alpha1.RetryableFakeHook,
1098+
request: &fakev1alpha1.RetryableFakeRequest{},
1099+
response: &fakev1alpha1.RetryableFakeResponse{},
1100+
},
1101+
},
1102+
{
1103+
name: "should succeed and wait on blocking serial handler",
1104+
registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingConfig},
1105+
testServer: testServerConfig{
1106+
start: true,
1107+
responses: map[string]testServerResponse{
1108+
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/first-extension.*": response(runtimehooksv1.ResponseStatusSuccess),
1109+
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/second-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1),
1110+
// third-extension has no handler.
1111+
},
1112+
},
1113+
args: args{
1114+
hook: fakev1alpha1.RetryableFakeHook,
1115+
request: &fakev1alpha1.RetryableFakeRequest{},
1116+
response: &fakev1alpha1.RetryableFakeResponse{},
1117+
},
1118+
},
1119+
{
1120+
name: "should succeed and wait on blocking serial handler, which is called with priority",
1121+
registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingWithPriorityConfig},
1122+
testServer: testServerConfig{
1123+
start: true,
1124+
responses: map[string]testServerResponse{
1125+
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/second-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1),
1126+
// second and third extension has no handler.
1127+
},
1128+
},
1129+
args: args{
1130+
hook: fakev1alpha1.RetryableFakeHook,
1131+
request: &fakev1alpha1.RetryableFakeRequest{},
1132+
response: &fakev1alpha1.RetryableFakeResponse{},
1133+
},
1134+
},
10751135
{
10761136
name: "should fail when one of the ExtensionHandlers returns 404",
10771137
registeredExtensionConfigs: []runtimev1.ExtensionConfig{extensionConfig},
@@ -1317,6 +1377,20 @@ func response(status runtimehooksv1.ResponseStatus) testServerResponse {
13171377
}
13181378
}
13191379

1380+
func retryResponse(status runtimehooksv1.ResponseStatus, retrySeconds int32) testServerResponse {
1381+
return testServerResponse{
1382+
response: &fakev1alpha1.RetryableFakeResponse{
1383+
CommonRetryResponse: runtimehooksv1.CommonRetryResponse{
1384+
RetryAfterSeconds: retrySeconds,
1385+
CommonResponse: runtimehooksv1.CommonResponse{
1386+
Status: status,
1387+
},
1388+
},
1389+
},
1390+
responseStatusCode: http.StatusOK,
1391+
}
1392+
}
1393+
13201394
func createSecureTestServer(server testServerConfig, callbacks ...func()) *httptest.Server {
13211395
mux := http.NewServeMux()
13221396
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
@@ -1335,7 +1409,10 @@ func createSecureTestServer(server testServerConfig, callbacks ...func()) *httpt
13351409
panic(err)
13361410
}
13371411
w.WriteHeader(resp.responseStatusCode)
1338-
_, _ = w.Write(respBody)
1412+
_, err = w.Write(respBody)
1413+
if err != nil {
1414+
panic(err)
1415+
}
13391416
return
13401417
}
13411418

internal/runtime/registry/registry.go

+15
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package registry
1818

1919
import (
20+
"sort"
2021
"sync"
2122

2223
"github.com/pkg/errors"
@@ -84,6 +85,13 @@ type ExtensionRegistration struct {
8485

8586
// Settings captures additional information sent in call to the RuntimeExtensions.
8687
Settings map[string]string
88+
89+
// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
90+
Priority int32
91+
92+
// Serial defines if the hook should be executed serially. This ensures that previously pending hooks are finished
93+
// first, as well as current hook reached completion, before moving to the next one.
94+
Serial bool
8795
}
8896

8997
// extensionRegistry is an implementation of ExtensionRegistry.
@@ -210,6 +218,11 @@ func (r *extensionRegistry) List(gh runtimecatalog.GroupHook) ([]*ExtensionRegis
210218
l = append(l, registration)
211219
}
212220
}
221+
222+
sort.SliceStable(l, func(i, j int) bool {
223+
return l[i].Priority > l[j].Priority
224+
})
225+
213226
return l, nil
214227
}
215228

@@ -263,6 +276,8 @@ func (r *extensionRegistry) add(extensionConfig *runtimev1.ExtensionConfig) erro
263276
TimeoutSeconds: e.TimeoutSeconds,
264277
FailurePolicy: e.FailurePolicy,
265278
Settings: extensionConfig.Spec.Settings,
279+
Priority: e.Priority,
280+
Serial: e.Serial,
266281
})
267282
}
268283

test/e2e/cluster_upgrade.go

+6-4
Original file line numberDiff line numberDiff line change
@@ -184,10 +184,12 @@ func ClusterUpgradeConformanceSpec(ctx context.Context, inputGetter func() Clust
184184
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
185185
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
186186
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
187-
PreWaitForControlPlaneToBeUpgraded: func() {
188-
if input.PreWaitForControlPlaneToBeUpgraded != nil {
189-
input.PreWaitForControlPlaneToBeUpgraded(input.BootstrapClusterProxy, namespace.Name, clusterName)
190-
}
187+
PreWaitForControlPlaneToBeUpgraded: []func(){
188+
func() {
189+
if input.PreWaitForControlPlaneToBeUpgraded != nil {
190+
input.PreWaitForControlPlaneToBeUpgraded(input.BootstrapClusterProxy, namespace.Name, clusterName)
191+
}
192+
},
191193
},
192194
})
193195
} else {

test/e2e/cluster_upgrade_runtimesdk.go

+27-6
Original file line numberDiff line numberDiff line change
@@ -277,12 +277,20 @@ func ClusterUpgradeWithRuntimeSDKSpec(ctx context.Context, inputGetter func() Cl
277277
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
278278
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
279279
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
280-
PreWaitForControlPlaneToBeUpgraded: func() {
281-
beforeClusterUpgradeTestHandler(ctx,
282-
input.BootstrapClusterProxy.GetClient(),
283-
clusterRef,
284-
input.E2EConfig.MustGetVariable(KubernetesVersionUpgradeTo),
285-
input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"))
280+
PreWaitForControlPlaneToBeUpgraded: []func(){
281+
func() {
282+
justBeforeClusterUpgradeTestHandler(ctx,
283+
input.BootstrapClusterProxy.GetClient(),
284+
clusterRef,
285+
input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"))
286+
},
287+
func() {
288+
beforeClusterUpgradeTestHandler(ctx,
289+
input.BootstrapClusterProxy.GetClient(),
290+
clusterRef,
291+
input.E2EConfig.MustGetVariable(KubernetesVersionUpgradeTo),
292+
input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"))
293+
},
286294
},
287295
PreWaitForWorkersToBeUpgraded: func() {
288296
machineSetPreflightChecksTestHandler(ctx,
@@ -580,6 +588,19 @@ func beforeClusterUpgradeTestHandler(ctx context.Context, c client.Client, clust
580588
}, intervals)
581589
}
582590

591+
// justBeforeClusterUpgradeTestHandler calls runtimeHookTestHandler with a blocking function which returns false if
592+
// JustBeforeClusterUpgrade hook passed, and BeforeClusterUpgrade hook is pending.
593+
func justBeforeClusterUpgradeTestHandler(ctx context.Context, c client.Client, cluster types.NamespacedName, intervals []interface{}) {
594+
hookName := "JustBeforeClusterUpgrade"
595+
runtimeHookTestHandler(ctx, c, cluster, hookName, true, func() bool {
596+
// Wait for JustBeforeClusterUpgrade to pass and BeforeClusterUpgrade pending
597+
return checkLifecycleHookResponses(ctx, c, cluster, map[string]string{
598+
"JustBeforeClusterUpgrade": "Status: Success, RetryAfterSeconds: 0",
599+
"BeforeClusterUpgrade": "Status: Success, RetryAfterSeconds: 5",
600+
}) != nil
601+
}, intervals)
602+
}
603+
583604
// afterControlPlaneUpgradeTestHandler calls runtimeHookTestHandler with a blocking function which returns false if any
584605
// MachineDeployment in the Cluster has upgraded to the target Kubernetes version.
585606
func afterControlPlaneUpgradeTestHandler(ctx context.Context, c client.Client, cluster types.NamespacedName, version string, intervals []interface{}) {

0 commit comments

Comments
 (0)