- Node Drain
- Drain Utilities
- Drain
- Drain Types
- drain.PodVolumeInfo
- drain.Options.evictPod
- drain.Options.deletePod
- drain.Options.getPodsForDeletion
- drain.Options.getPVList
- drain.Options.getVolIDsFromDriver
- drain.Options.doAccountingOfPvs
- drain.filterPodsWithPv
- drain.Options.waitForDetach
- drain.Options.waitForReattach
- drain.Options.waitForDelete
- drain.Options.RunDrain
- drain.Options.evictPodsWithoutPv
- drain.Options.evictPodsWithPv
- drain.Options.evictPodsWithPVInternal
Node Drain
Node Drain code is in github.com/gardener/machine-controller-manager/pkg/util/provider/drain/drain.go
Drain Utilities
VolumeAttachmentHandler
pkg/util/provider/drain.VolumeAttachmentHandler is an handler used to distribute
incoming k8s.io/api/storage/v1.VolumeAttachment requests to a number of workers where each worker is a channel of type *VolumeAttachment.
A k8s.io/api/storage/v1.VolumeAttachment is a non-namespaced k8s object that captures the intent to attach or detach the specified volume to/from the specified node. See VolumeAttachment
type VolumeAttachmentHandler struct {
sync.Mutex
workers []chan *storagev1.VolumeAttachment
}
// NewVolumeAttachmentHandler returns a new VolumeAttachmentHandler
func NewVolumeAttachmentHandler() *VolumeAttachmentHandler {
return &VolumeAttachmentHandler{
Mutex: sync.Mutex{},
workers: []chan *storagev1.VolumeAttachment{},
}
}
VolumeAttachmentHandler.AddWorker
AddWorker appends a buffered channel of size 20 of type VolumeAttachment to the workers slice in VolumeAttachmentHandler
. There is an assumption that not more than 20 unprocessed objects would exist at a given time. On bufferring requests beyond this the channel will start dropping writes. See dispatch method.
func (v *VolumeAttachmentHandler) AddWorker() chan *storagev1.VolumeAttachment {
// chanSize is the channel buffer size to hold requests.
// This assumes
// On bufferring requests beyond this the channel will start dropping writes
const chanSize = 20
klog.V(4).Infof("Adding new worker. Current active workers %d - %v", len(v.workers), v.workers)
v.Lock()
defer v.Unlock()
newWorker := make(chan *storagev1.VolumeAttachment, chanSize)
v.workers = append(v.workers, newWorker)
klog.V(4).Infof("Successfully added new worker %v. Current active workers %d - %v", newWorker, len(v.workers), v.workers)
return newWorker
}
VolumeAttachmentHandler.dispatch
The dispatch method is responsible for distributing incomding VolumeAttachents to available channels.
func (v *VolumeAttachmentHandler) dispatch(obj interface{}) {
if len(v.workers) == 0 {
// As no workers are registered, nothing to do here.
return
}
volumeAttachment := obj.(*storagev1.VolumeAttachment)
v.Lock()
defer v.Unlock()
for i, worker := range v.workers {
select {
// submit volume attachment to the worker channel if channel is not full
case worker <- volumeAttachment:
default:
klog.Warningf("Worker %d/%v is full. Discarding value.", i, worker)
// TODO: Umm..isn't this problematic if we miss this ?
}
}
}
The Add|Update methods below delegate to dispatch. The usage of this utility involves specifying the add/update methods below as the event handler callbacks on an instance of k8s.io/client-go/informers/storage/v1.VolumeAttachmentInformer. This way incoming volume attachments are distributed to several worker channels.
func (v *VolumeAttachmentHandler) AddVolumeAttachment(obj interface{}) {
v.dispatch(obj)
}
func (v *VolumeAttachmentHandler) UpdateVolumeAttachment(oldObj, newObj interface{}) {
v.dispatch(newObj)
}
VolumeAttachmentHandler Initialization in MC
During construction of the MC controller struct, we initialize the callback methods on volume attachment handler using the volume attachment informer
func NewController(...) {
//...
controller.volumeAttachmentHandler = drain.NewVolumeAttachmentHandler()
volumeAttachmentInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: controller.volumeAttachmentHandler.AddVolumeAttachment,
UpdateFunc: controller.volumeAttachmentHandler.UpdateVolumeAttachment,
});
Drain
Drain Types
Drain Constants
PodEvictionRetryIntervalis the interval in which to retry eviction for podsGetPvDetailsMaxRetriesis the number of max retries to get PV details using the PersistentVolumeLister or PersistentVolumeClaimListerGetPvDetailsRetryIntervalis the interval in which to retry getting PV details
const (
PodEvictionRetryInterval = time.Second * 20
GetPvDetailsMaxRetries = 3
GetPvDetailsRetryInterval = time.Second * 5
)
drain.Options
drain.Options are configurable options while draining a node before deletion
NOTE: Unused fields/Fields with constant defaults omitted for brevity
type Options struct {
client kubernetes.Interface
kubernetesVersion *semver.Version
Driver driver.Driver
drainStartedOn time.Time
drainEndedOn time.Time
ErrOut io.Writer
ForceDeletePods bool
MaxEvictRetries int32
PvDetachTimeout time.Duration
PvReattachTimeout time.Duration
nodeName string
Out io.Writer
pvcLister corelisters.PersistentVolumeClaimLister
pvLister corelisters.PersistentVolumeLister
pdbV1Lister policyv1listers.PodDisruptionBudgetLister
nodeLister corelisters.NodeLister
volumeAttachmentHandler *VolumeAttachmentHandler
Timeout time.Duration
}
drain.PodVolumeInfo
drain.PodVolumeInfo is the struct used to encapsulate the PV names and PV ID's for all the PVs attached to the pod
PodVolumeInfo struct {
persistentVolumeList []string
volumeList []string
}
NOTE: The struct fields are badly named.
PodVolumeInfo.persistentVolumeListis a slice of persistent volume names. This is from PersistentVolumeSpec.VolumeNamePodVolumeInfo.volumeListis a slice of persistent volume IDs. This is obtained using driver.GetVolumeIDs given the PV Spec. This is generally the CSI volume id.
drain.Options.evictPod
func (o *Options) evictPod(ctx context.Context, pod *corev1.Pod, policyGroupVersion string) error
drain.Options.evictPod is a simple helper method to evict a Pod using Eviction API
- TODO:
GracePeriodSecondsin the code is useless here and should be removed as it is always -1. - TODO: Currently this method uses old
k8s.io/api/policy/v1beta1. It must be changed tok8s.io/api/policy/v1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%
flowchart TD
Begin(("" ))
-->InitTypeMeta["
typeMeta:= metav1.TypeMeta{
APIVersion: policyGroupVersion,
Kind: 'Eviction',
},
"]
-->InitObjectMeta["
objectMeta := ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
"]
-->InitEviction["
eviction := &.Eviction{TypeMeta: typeMeta, ObjectMeta: objectMeta }
"]
-->EvictPod["
err := o.client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(ctx, eviction)
"]
-->ReturnErr(("return err"))
style InitEviction text-align:left
drain.Options.deletePod
Simple helper method to delete a Pod
func (o *Options) deletePod(ctx context.Context, pod *corev1.Pod) error {
Just delegates to PodInterface.Delete
o.client.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{} )
drain.Options.getPodsForDeletion
drain.getPodsForDeletion returns all the pods we're going to delete. If there are any pods preventing us from deleting, we return that list in an error.
func (o *Options) getPodsForDeletion(ctx context.Context)
(pods []corev1.Pod, err error)
- Get all pods associated with the node.
podList, err := o.client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeName}).String()}) - Iterate through
podList. - Apply a bunch of pod filters.
- Remove mirror pods from consideration for deletion. See Static Pods
- Local Storage Filter. Discuss: seems useless. If Pod has local storge, remove it from consideration. This filter iterates through
Pod.Spec.Volumesslice and checks whetherVolume.EmptyDiris non nil in order to determine - A Pod whose
Pod.Status.Phaseis PodSucceeded or PodFailed is eligible for deletion- If a Pod has a controller owner reference, it is eligible for deletion. (TODO: Unsure why this makes a difference anyways)
- The final pod filter
daemonsetFilterseems useless. Discuss.
drain.Options.getPVList
NOTE: Should be called getPVNames. Gets a slice of the persistent volume names bound to the Pod through its claims. Contains time.sleep and retry handling to a limit. Unsure if this is the best way. Discuss.
func (o *Options) getPVList(pod *corev1.Pod) (pvNames []string, err error)
- Iterate over
pod.Spec.Volumes. - If
volume.PersistentVolumeClaimreference is not nil, gets thePersistentVolumeClaimusingo.pvcListerusingvol.PersistentVolumeClaim.ClaimName.- Implements error handling and retry till
GetPvDetailsMaxRetriesis reached with intervalGetPvDetailsRetryIntervalfor the above.
- Implements error handling and retry till
- Adds
pvc.Spec.VolumeNametopvNames - Return
pvNames
drain.Options.getVolIDsFromDriver
Given a slice of PV Names, this method gets the corresponding volume ids from the driver.
- It does this by first getting the PersistentVolumeSpec using
o.pvLister.Get(pvName)for each PV name and adding to thepvSpecsslice of typePersistentVolumeSpec. See k8s.io/client-go/listers/core/v1.PersistentVolumeLister - Retry handling is implemented here while looking up pvName till
GetPvDetailsMaxRetriesis reached with sleep interval ofGetPvDetailsRetryIntervalbetween each retry attempt. - Once
pvSpecsslice is populated it constructs a driver.GetVolumeIDsRequest from the same and then invokesdriver.GetVolumeIDs(driver.GetVolumeIDsRequest))to obtain the driver.GetVolumeIDsResponse and retrunsdriver.GetVolumeIDsResponse.VolumeIDs
TODO: BUG ? In case the PV is not found or retry limit is reached the slice of volume ids will not have a 1:1 correspondence with slice of PV names passed in.
func (o *Options) getVolIDsFromDriver(ctx context.Context, pvNames []string) ([]string, error)
drain.Options.doAccountingOfPvs
drain.Options.doAccountingOfPvs returns a map of the pod key pod.Namespace + '/' + pod.Name to a PodVolumeInfo struct which holds a slice of PV names and PV IDs.
NOTES:
- See filterSharedPVs
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%
flowchart TD
Begin((" "))
-->Init["
podKey2VolNamesMap = make(map[string][]string)
podKey2VolInfoMap = make(map[string]PodVolumeInfo)
"]
-->RangePods["
for pod := range pods
"]
-->PopPod2VolNames["
podKey2VolNamesMap[pod.Namespace + '/' pod.Name] = o.getPVList(pod)
"]
--loop-->RangePods
PopPod2VolNames--done-->FilterSharedPVs["
filterSharedPVs(podKey2VolNamesMap)
// filters out the PVs that are shared among pods.
"]
-->RangePodKey2VolNamesMap["
for podKey, volNames := range podKey2VolNamesMap
"]
-->GetVolumeIds["
volumeIds, err := o.getVolIDsFromDriver(ctx, volNames)
if err != nil continue; //skip set of volumes
"]
-->InitPodVolInfo["
podVolumeInfo := PodVolumeInfo{
persistentVolumeList: volNames,
volumeList: volumeIds
}
//struct field names are bad.
"]
-->PopPodVolInfoMap["
podVolumeInfoMap[podKey] = podVolumeInfo
"]
--loop-->RangePodKey2VolNamesMap
PopPodVolInfoMap--done-->Return(("return podVolumeInfoMap"))
drain.filterPodsWithPv
NOTE: should have been named partitionPodsWithPVC
Utility function that iterates through given pods and for each pod, iterates through its pod.Spec.Volumes. For each such pod volume checks volume.PersistentVolumeClaim. If not nil, adds pod to slice podsWithPV else adds pod to slice podsWithoutPV
func filterPodsWithPv(pods []corev1.Pod)
(podsWithPV []*corev1.Pod, podsWithoutPV []*corev1.Pod)
drain.Options.waitForDetach
func (o *Options) waitForDetach(ctx context.Context,
podVolumeInfo PodVolumeInfo,
nodeName string) error
Summary:
- Initiaze boolean
foundto true. (Representing that a volume is still attached to a node). - Begins a loop while
foundis true - Uses a
selectand checks to see if a signal is received fromcontext.Done()(ie context cancelled). If so, return an error with the message that a timeout occurred while waiting for PV's to be detached. - Sets
foundto false. - Gets the
Nodeassociated withnodeNameusing thenodeListerand assign tonode. If there is an error return from the function. - Gets the
node.Status.VolumesAttachedwhich returns a []AttachedVolume and assign toattachedVols.- Return
nilif this slice is empty.
- Return
- Begin iteration
range podVolumeInfo.volumeListassigningvolumeIDin parent iteration. Label this iteration asLookUpVolume.- Begin inner iteration over
attachedVolsassigningattachedVolin nested iteration - If
attachedVol.Nameis contained involumeIDthen this volume is still attached to the node.- Set
foundto true - Sleep for
VolumeDetachPollIntervalseconds (5 seconds) - Break out of
LookUpVolume
- Set
- Begin inner iteration over
drain.Options.waitForReattach
Purpose: Waits for persistent volume names in podVolumeInfo.persistentVolumeList to be re-attached (to another node).
But I am still confused on why we need to call this during the drain flow. Why should we wait for PVs to be attached to another node. After all, there is no guarantee they will be attached, right ?
func (o *Options) waitForReattach(ctx context.Context,
podVolumeInfo PodVolumeInfo,
previousNodeName string,
volumeAttachmentEventCh chan *VolumeAttachment) error
- Construct a map:
var pvsWaitingForReattachments map[string]bool - Initiamize the above map by ranging through
podVolumeInfo.persistentVolumeListtaking thepersistentVolumeNameand setpvsWaitingForReattachments[persistentVolumeName] = true - Start a
forloop.- Commence a
selectwith following cases:- Case: Check to see if context is closed/cancelled by reading:
<-ctx.Done(). If so, return an error with the message that timeout occurred while waiting for PV's to reattach to another node. - Case: Obtain a *VolumeAttachment by reading from channel:
incomingEvent := <-volumeAttachmentEventCh- Get the
persistentVolumeNameassociated with this attachment event. persistentVolumeName := *incomingEvent.Spec.Source.PersistentVolumeName- Check if this persistent volume was being tracked:
pvsWaitingForReattachments[persistentVolumeName]is present - Check if the volume was attached to another node
incomingEvent.Status.Attached && incomingEvent.Spec.NodeName != previousNodeName- If above is true, then delete entry corresponding to
persistentVolumeNamefrom thepvsWaitingForReattachmentsmap.
- Get the
- Case: Check to see if context is closed/cancelled by reading:
- if
pvsWaitingForReattachmentsis empty break from the loop.
- Commence a
- Log that the volumes in
podVolumeInfo.persistentVolumeListhave been successfully re-attached and return nil.
drain.Options.waitForDelete
NOTE: Ideally should have been named waitForPodDisappearance
pkg/util/provider/drain.Options.waitForDelete is a helper method defined on drain.Options that leverages wait.PollImmediate and the getPodFn (get pod by name and namespace) and checks that all pods have disappeared within timeout. The set of pods that did not disappear within timeout is returned as pendingPods
func (o *Options) waitForDelete(
pods []*corev1.Pod, interval,
timeout time.Duration,
getPodFn func(string, string) (*corev1.Pod, error)
) (pendingPods []*corev1.Pod, err error)
drain.Options.RunDrain
Context: drain.Options.RunDrain is called from the MC helper method controller.drainNode which in turn is called from controller.triggerDeletionFlow when the machine.Status.LastOperation.Description contains operation machineutils.InitiateDrain.
If RunDrain returns an error, then the drain is retried at a later time by putting back the machine key into the queue. Unless the force-deletion label on the machine object is true - in which case we proceed to VM deletion.
func (o *Options) RunDrain(ctx context.Context) error
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%
flowchart TD
GetNode["node, err = .client.CoreV1().Nodes().Get(ctx, o.nodeName, metav1.GetOptions{})"]
-->ChkGetNodeErr{err != nil}
ChkGetNodeErr--Yes-->ReturnNilEarly(("return nil
(case where deletion
triggered during machine creation,
so node is nil.
TODO: should use apierrors.NotFound)"))
ChkGetNodeErr--No-->ChkNodeUnschedulable{node.Spec.Unschedulable?}
ChkNodeUnschedulable--Yes-->GetPodsForDeletion["
pods, err := o.getPodsForDeletion(ctx)
if err!=nil return err"]
ChkNodeUnschedulable--No-->CloneNode["clone := node.DeepCopy()
clone.Spec.Unschedulable = true"]
-->UpdateNode["_, err = o.client.CoreV1().Nodes().Update(ctx, clone, metav1.UpdateOptions{})
if err != nil return err
"]
UpdateNode-->GetPodsForDeletion
GetPodsForDeletion-->GetEvictionPGV["
policyGroupVersion, err := SupportEviction(o.client)
if err != nil return err
"]
-->
DefineAttemptEvict["
attemptEvict := !o.ForceDeletePods && len(policyGroupVersion) > 0
// useless boolean which confuses matters considerably.
"]
-->
DefineGetPodFn["
getPodFn := func(namespace, name string) (*corev1.Pod, error) {
return o.client.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{})
}"]
-->
CreateReturnChannel["
returnCh := make(chan error, len(pods))
defer close(returnCh)
"]
-->ChkForceDelPods{"o.ForceDeletePods?"}
ChkForceDelPods--Yes-->EvictPodsWithoutPV["go o.evictPodsWithoutPv(ctx, attemptEvict, pods, policyGroupVersion, getPodFn, returnCh)
// go-routine feels un-necessary here."]
ChkForceDelPods--No-->FilterPodsWithPv["
podsWithPv, podsWithoutPv := filterPodsWithPv(pods)
"]
FilterPodsWithPv-->EvictPodsWithPv["
go o.evictPodsWithPv(ctx, attemptEvict, podsWithPv, policyGroupVersion, getPodFn, returnCh)
"]
-->EvictPodsWithoutPV1["
go o.evictPodsWithoutPv(ctx, attemptEvict, podsWithoutPv, policyGroupVersion, getPodFn, returnCh)
"]-->CreateAggregateError
EvictPodsWithoutPV-->CreateAggregateError["
var errors []error
errors =
for i = 0; i < len(pods); ++i {
err := <-returnCh
if err != nil {
errors = append(errors, err)
}
}
"]
-->ReturnAggError(("return\nerrors.NewAggregate(errors)"))
Notes:
- machine-controller-manager/pkg/util/provider/drain.SupportEviction uses Discovery API to find out if the server support eviction subresource and if so return its groupVersion or "" if it doesn't.
- k8s.io/kubectl/pkg/drain.CheckEvictionSupport already does this.
- attemptEvict boolean usage is confusing. Stick to
drain.Options.ForceDeletePods - TODO: GAP? For cordoning a Node we currently just set
Node.Spec.Unschedulable. But we are also supposed to set the taint.node.kubernetes.io/unschedulable. The spec way is supposed to be deprecated.
drain.Options.evictPodsWithoutPv
drain method that iterates through each given pod and for each pod launches a go-routine that simply delegates to Options.evictPodsWithoutPv.
func (o *Options) evictPodsWithoutPv(ctx context.Context,
pods []*corev1.Pod,
policyGroupVersion string, //eviction API's GV
getPodFn func(namespace, name string) (*corev1.Pod, error),
returnCh chan error) {
for _, pod := range pods {
go o.evictPodWithoutPVInternal(ctx, attemptEvict, pod, policyGroupVersion, getPodFn, returnCh)
}
return
}
drain.Options.evictPodWithoutPVInternal
drian method that that either evicts or deletes a Pod with retry handling until Options.MaxEvictRetries is reached.
func (o *Options) evictPodWithoutPVInternal(
ctx context.Context,
attemptEvict bool,
pod *corev1.Pod,
policyGroupVersion string,
getPodFn func(namespace, name string) (*corev1.Pod, error),
returnCh chan error)
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%
flowchart TD
RangePod["pod := range pods"]
RangePod-->EvictOrDelPod["go evictPodWithoutPVInternal(attemptEvict bool, pod, policyGroupVersion,pod,getPodFn,returnCh)"]
EvictOrDelPod-->Begin
subgraph "evictPodWithoutPVInternal (evicts or deletes Pod) "
Begin(("Begin"))-->SetRetry["retry := 0"]
SetRetry
-->SetAttemptEvict["if retry >= o.MaxEvictRetries {attemptEvict=false}"]
-->ChkAttemptEvict{"attemptEvict ?"}
ChkAttemptEvict--Yes-->EvictPod["err=o.evictPod(ctx, pod, policyGroupVersion)"]
ChkAttemptEvict--No-->DelPod["err=o.deletePod(ctx, pod)"]
EvictPod-->ChkErr
DelPod-->ChkErr
ChkErr{"Check err"}
ChkErr--"Nil"-->ChkForceDelPods
ChkErr--"IsTooManyRequests(err)"-->GetPdb["
// Possible case where Pod couldn't be evicted because of PDB violation
pdbs, err = pdbLister.GetPodPodDisruptionBudgets(pod)
pdb=pdbs[0] if err !=nil && len(pdbs) > 0
"]
ChkErr--"IsNotFound(err)\n(pod evicted)"-->SendNilChannel-->NilReturn
ChkErr--"OtherErr"-->SendErrChannel
GetPdb-->ChkMisConfiguredPdb{"isMisconfiguredPdb(pdb)?"}
ChkMisConfiguredPdb--Yes-->SetPdbError["err=fmt.Errorf('pdb misconfigured')"]
SetPdbError-->SendErrChannel
ChkMisConfiguredPdb--No-->SleepEvictRetryInterval["time.Sleep(PodEvictionRetryInterval)"]
SleepEvictRetryInterval-->IncRetry["retry+=1"]-->SetAttemptEvict
SendErrChannel-->NilReturn
ChkForceDelPods{"o.ForceDeletePods"}
ChkForceDelPods--"Yes\n(dont wait for\npod disappear)"-->SendNilChannel
ChkForceDelPods--No-->GetPodTermGracePeriod["
// TODO: discuss this, shouldn't pod grace period override drain ?
timeout=Min(pod.Spec.TerminationGracePeriodSeconds,o.Timeout)
"]
-->SetBufferPeriod["bufferPeriod := 30 * time.Second"]
-->WaitForDelete["pendingPods=o.waitForDelete(pods, timeout,getPodFn)"]
-->ChkWaitForDelError{err != nil ?}
ChkWaitForDelError--Yes-->SendErrChannel
ChkWaitForDelError--No-->ChkPendingPodsLength{"len(pendingPods) > 0?"}
ChkPendingPodsLength--Yes-->SetTimeoutError["err = fmt.Errorf('pod term timeout')"]
SetTimeoutError-->SendErrChannel
ChkPendingPodsLength--No-->SendNilChannel
end
SendNilChannel["returnCh <- nil"]
SendErrChannel["returnCh <- err"]
NilReturn(("return"))
isMisconfiguredPdb
TODO: Discuss/Elaborate on why this is considered misconfigured.
func isMisconfiguredPdbV1(pdb *policyv1.PodDisruptionBudget) bool {
if pdb.ObjectMeta.Generation != pdb.Status.ObservedGeneration {
return false
}
return pdb.Status.ExpectedPods > 0 &&
pdb.Status.CurrentHealthy >= pdb.Status.ExpectedPods
&& pdb.Status.DisruptionsAllowed == 0
}
drain.Options.evictPodsWithPv
func (o *Options) evictPodsWithPv(ctx context.Context,
attemptEvict bool,
pods []*corev1.Pod,
policyGroupVersion string,
getPodFn func(namespace, name string) (*corev1.Pod, error),
returnCh chan error)
NOTE
- See drain.Options.evictPodsWithPv
- This method basically delegates to
o.evictPodsWithPVInternalwith retry handling - TODO: Logic of this method can do with some refactoring!
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%%
flowchart TD
Begin((" "))
-->SortPods["
sortPodsByPriority(pods)
//Desc priority: pods[i].Spec.Priority > *pods[j].Spec.Priority
"]
-->DoVolumeAccounting["
podVolumeInfoMap := o.doAccountingOfPvs(ctx, pods)
"]
-->ChkAttemptEvict{attemptEvict ?}
ChkAttemptEvict--Yes-->RetryTillLimit["
until MaxEvictRetries
"]
-->
InvokeHelper["
remainingPods, aborted = o.evictPodsWithPVInternal(ctx, attemptEvict, pods, podVolumeInfoMap, policyGroupVersion, returnCh)
"]
InvokeHelper-->ChkAbort{"
aborted ||
len(remainingPods) == 0
"}
ChkAbort--Yes-->RangeRemainingPods
ChkAbort--No-->Sleep["
pods = remainingPods
time.Sleep(PodEvictionRetryInterval)
"]
Sleep--loop-->RetryTillLimit
RetryTillLimit--loopend-->ChkRemaining{"len(remainingPods) > 0 && !aborted ?"}
ChkRemaining--Yes-->InvokeHelper1["
// force delete pods
remainingPods, _ = o.evictPodsWithPVInternal(ctx, false, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh)
"]
ChkAttemptEvict--No-->InvokeHelper1
InvokeHelper1-->RangeRemainingPods
ChkRemaining--No-->RangeRemainingPods
RangeRemainingPods["pod := range remainingPods"]
RangeRemainingPods--aborted?-->SendNil["returnCh <- nil"]
RangeRemainingPods--attemptEvict?-->SendEvictErr["returnCh <- fmt.Errorf('pod evict error')"]
RangeRemainingPods--else-->SendDelErr["returnCh <- fmt.Errorf('pod delete error')"]
SendNil-->NilReturn
SendEvictErr-->NilReturn
SendDelErr-->NilReturn
NilReturn(("return"))
drain.Options.evictPodsWithPVInternal
FIXME: name case inconsistency with evictPodsWithPv
drain.Options.evictPodsWithPVInternal is a drain helper method that actually evicts/deletes pods and waits for volume detachment. It returns a remainingPods slice and a fastTrack boolean is meant to abort the pod eviction and exit the calling go-routine. (TODO: should be called abort or even better should use custom error here)
func (o *DrainOptions) evictPodsWithPVInternal(ctx context.Context,
attemptEvict bool,
pods []*corev1.Pod,
volMap map[string][]string,
policyGroupVersion string,
returnCh chan error
) (remainingPods []*api.Pod, fastTrack bool)
- Uses
context.Deadlinepassing inctxand a deadline time after the drain timeout to get a sub-context assigned tomainContextand aCancelFunc. Defer the obtainedCancelFunc. (So it always invoked when the method terminates) - Maintain a pod slice
retryPodswhich is initially empty. - Iterate through
podsslice withias index variable- Apply a select with the one check case:
- Check to see if
mainContextis closed/cancelled. Attempt to read from the Done channel:<-mainContext.Done(). If this case matches:- Send
nilon the return error channel:returnCh <- nil - Compute
remainingPodsasretryPodsslice appended with pods yet to be iterated:pods[i+1:]... - Return
remainingPods, true. (aborted is true)
- Send
- Check to see if
- Initiate the pod eviction start time:
podEvictionStartTime=time.Now() - Call
volumeAttachmentHandler.AddWorker()to start trackingVolumeAttachmentsand obtain avolumeAttachmentEventChreceive channel that one can use to receive the attached or detached*.VolumeAttachment. - If
attemptEvictis true, then call evictPod else call deletePod helper method. Grab theerrfor eviction/deletion. - eviction/deletion had an error: Analyze the
err:- If both
attemptEvictis true andapierrors.IsTooManyRequests(err)is true, then this case is interpreted as an eviction failure due to PDB violation.- We get the PodDisruptionBudget for the pod being iterated.
- We check whether it is misconfigured. IF So we send an error on
returnChand close thevolumeAttachmentEventChusingvolumeAttachmentHandler.DeleteWorkerand continue with next loop iteration. ie go to next pod.
- If just
apierrors.IsNotFound(err)is true, this means that Pod is already gone from the node. We sendnilonreturnChand callvolumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)and continue with next pod in iteration. - Otherwise we add the pod to the
retryPodslice:retryPods = append(retryPods, pod), callvolumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)and continue with next pod in iteration. - (NOTE: Error handling can be optimized. too much repetition)
- If both
- Log that the evict/delete was successful.
- Get the PodVolumeInfo from
volMapusing the pod key. - Obtain a context and cancellation function for volume detachment using context.Timeout passing in the
mainContextand detach timeout computed as the sum of the termination grace period of the pod (pod.Spec.TerminationGracePeriodSecondsif not nil) added to thePvDetachTimeout(from the drain options) - Invoke waitForDetach(ctx, podVolumeInfo, o.nodeName) and grab the
err. - Invoke the cancel function for detach. NOTE: THIS IS NICHT GUT. The sub context should be created INSIDE waitForDetach with a defer for the cancelllation.
- Analyze the detachment error.
- If
apierrors.IsNotFound(err)is true this indicates that the node is not found.- Send
nilonreturnCh - Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh) - Compute
remainingPodsasretryPodsslice appended with pods yet to be iterated:pods[i+1:]... - Return
remainingPods, true. (aborted is true)
- Send
- For other errors:
- Send the
erron thereturnCh. - Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh) - Continue with next pod in iteration.
- Send the
- If
- Obtain a context and cancellation function for volume re-attachment using context.Timeout passing in the
mainContextanddrain.Options.PvReattachTimeout. - Invoke waitForReattach(ctx, podVolumeInfo, o.nodeName, volumeAttachmentEventCh) and grab the returned
err. - Invoke the cancel function for reattach. NOTE: THIS IS NICHT GUT. The sub context should be created INSIDE
waitForReattach. - Analyze the re-attachment error.
- If err is a reattachment timeout error just log a warning. TODO: Confused on why we don't return an error on the return channel here.
- Otherwise we Send the
erron thereturnCh. - Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)and continue with next pod in iteration
- YAWN. Someone is very fond of calling this again and again. Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh) - Log the time taken for pod eviction+vol detachment+vol attachment to another node using
time.Since(podEvictionStartTime). - Send
nilonreturnCh
- Apply a select with the one check case:
- pod iteration loop is done:
return retryPods, false