- Node Drain
- Drain Utilities
- Drain
- Drain Types
- drain.PodVolumeInfo
- drain.Options.evictPod
- drain.Options.deletePod
- drain.Options.getPodsForDeletion
- drain.Options.getPVList
- drain.Options.getVolIDsFromDriver
- drain.Options.doAccountingOfPvs
- drain.filterPodsWithPv
- drain.Options.waitForDetach
- drain.Options.waitForReattach
- drain.Options.waitForDelete
- drain.Options.RunDrain
- drain.Options.evictPodsWithoutPv
- drain.Options.evictPodsWithPv
- drain.Options.evictPodsWithPVInternal
Node Drain
Node Drain code is in github.com/gardener/machine-controller-manager/pkg/util/provider/drain/drain.go
Drain Utilities
VolumeAttachmentHandler
pkg/util/provider/drain.VolumeAttachmentHandler is an handler used to distribute
incoming k8s.io/api/storage/v1.VolumeAttachment requests to a number of workers where each worker is a channel of type *VolumeAttachment
.
A k8s.io/api/storage/v1.VolumeAttachment is a non-namespaced k8s object that captures the intent to attach or detach the specified volume to/from the specified node. See VolumeAttachment
type VolumeAttachmentHandler struct {
sync.Mutex
workers []chan *storagev1.VolumeAttachment
}
// NewVolumeAttachmentHandler returns a new VolumeAttachmentHandler
func NewVolumeAttachmentHandler() *VolumeAttachmentHandler {
return &VolumeAttachmentHandler{
Mutex: sync.Mutex{},
workers: []chan *storagev1.VolumeAttachment{},
}
}
VolumeAttachmentHandler.AddWorker
AddWorker
appends a buffered channel of size 20
of type VolumeAttachment
to the workers
slice in VolumeAttachmentHandler
. There is an assumption that not more than 20 unprocessed objects would exist at a given time. On bufferring requests beyond this the channel will start dropping writes. See dispatch
method.
func (v *VolumeAttachmentHandler) AddWorker() chan *storagev1.VolumeAttachment {
// chanSize is the channel buffer size to hold requests.
// This assumes
// On bufferring requests beyond this the channel will start dropping writes
const chanSize = 20
klog.V(4).Infof("Adding new worker. Current active workers %d - %v", len(v.workers), v.workers)
v.Lock()
defer v.Unlock()
newWorker := make(chan *storagev1.VolumeAttachment, chanSize)
v.workers = append(v.workers, newWorker)
klog.V(4).Infof("Successfully added new worker %v. Current active workers %d - %v", newWorker, len(v.workers), v.workers)
return newWorker
}
VolumeAttachmentHandler.dispatch
The dispatch
method is responsible for distributing incomding VolumeAttachent
s to available channels.
func (v *VolumeAttachmentHandler) dispatch(obj interface{}) {
if len(v.workers) == 0 {
// As no workers are registered, nothing to do here.
return
}
volumeAttachment := obj.(*storagev1.VolumeAttachment)
v.Lock()
defer v.Unlock()
for i, worker := range v.workers {
select {
// submit volume attachment to the worker channel if channel is not full
case worker <- volumeAttachment:
default:
klog.Warningf("Worker %d/%v is full. Discarding value.", i, worker)
// TODO: Umm..isn't this problematic if we miss this ?
}
}
}
The Add|Update
methods below delegate to dispatch. The usage of this utility involves specifying the add/update methods below as the event handler callbacks on an instance of k8s.io/client-go/informers/storage/v1.VolumeAttachmentInformer. This way incoming volume attachments are distributed to several worker channels.
func (v *VolumeAttachmentHandler) AddVolumeAttachment(obj interface{}) {
v.dispatch(obj)
}
func (v *VolumeAttachmentHandler) UpdateVolumeAttachment(oldObj, newObj interface{}) {
v.dispatch(newObj)
}
VolumeAttachmentHandler Initialization in MC
During construction of the MC controller struct, we initialize the callback methods on volume attachment handler using the volume attachment informer
func NewController(...) {
//...
controller.volumeAttachmentHandler = drain.NewVolumeAttachmentHandler()
volumeAttachmentInformer.Informer().AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: controller.volumeAttachmentHandler.AddVolumeAttachment,
UpdateFunc: controller.volumeAttachmentHandler.UpdateVolumeAttachment,
});
Drain
Drain Types
Drain Constants
PodEvictionRetryInterval
is the interval in which to retry eviction for podsGetPvDetailsMaxRetries
is the number of max retries to get PV details using the PersistentVolumeLister or PersistentVolumeClaimListerGetPvDetailsRetryInterval
is the interval in which to retry getting PV details
const (
PodEvictionRetryInterval = time.Second * 20
GetPvDetailsMaxRetries = 3
GetPvDetailsRetryInterval = time.Second * 5
)
drain.Options
drain.Options
are configurable options while draining a node before deletion
NOTE: Unused fields/Fields with constant defaults omitted for brevity
type Options struct {
client kubernetes.Interface
kubernetesVersion *semver.Version
Driver driver.Driver
drainStartedOn time.Time
drainEndedOn time.Time
ErrOut io.Writer
ForceDeletePods bool
MaxEvictRetries int32
PvDetachTimeout time.Duration
PvReattachTimeout time.Duration
nodeName string
Out io.Writer
pvcLister corelisters.PersistentVolumeClaimLister
pvLister corelisters.PersistentVolumeLister
pdbV1Lister policyv1listers.PodDisruptionBudgetLister
nodeLister corelisters.NodeLister
volumeAttachmentHandler *VolumeAttachmentHandler
Timeout time.Duration
}
drain.PodVolumeInfo
drain.PodVolumeInfo
is the struct used to encapsulate the PV names and PV ID's for all the PVs attached to the pod
PodVolumeInfo struct {
persistentVolumeList []string
volumeList []string
}
NOTE: The struct fields are badly named.
PodVolumeInfo.persistentVolumeList
is a slice of persistent volume names. This is from PersistentVolumeSpec.VolumeNamePodVolumeInfo.volumeList
is a slice of persistent volume IDs. This is obtained using driver.GetVolumeIDs given the PV Spec. This is generally the CSI volume id.
drain.Options.evictPod
func (o *Options) evictPod(ctx context.Context, pod *corev1.Pod, policyGroupVersion string) error
drain.Options.evictPod is a simple helper method to evict a Pod using Eviction API
- TODO:
GracePeriodSeconds
in the code is useless here and should be removed as it is always -1. - TODO: Currently this method uses old
k8s.io/api/policy/v1beta1
. It must be changed tok8s.io/api/policy/v1
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TD Begin(("" )) -->InitTypeMeta[" typeMeta:= metav1.TypeMeta{ APIVersion: policyGroupVersion, Kind: 'Eviction', }, "] -->InitObjectMeta[" objectMeta := ObjectMeta: metav1.ObjectMeta{ Name: pod.Name, Namespace: pod.Namespace, }, "] -->InitEviction[" eviction := &.Eviction{TypeMeta: typeMeta, ObjectMeta: objectMeta } "] -->EvictPod[" err := o.client.PolicyV1beta1().Evictions(eviction.Namespace).Evict(ctx, eviction) "] -->ReturnErr(("return err")) style InitEviction text-align:left
drain.Options.deletePod
Simple helper method to delete a Pod
func (o *Options) deletePod(ctx context.Context, pod *corev1.Pod) error {
Just delegates to PodInterface.Delete
o.client.CoreV1().Pods(pod.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{} )
drain.Options.getPodsForDeletion
drain.getPodsForDeletion
returns all the pods we're going to delete. If there are any pods preventing us from deleting, we return that list in an error.
func (o *Options) getPodsForDeletion(ctx context.Context)
(pods []corev1.Pod, err error)
- Get all pods associated with the node.
podList, err := o.client.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": o.nodeName}).String()})
- Iterate through
podList
. - Apply a bunch of pod filters.
- Remove mirror pods from consideration for deletion. See Static Pods
- Local Storage Filter. Discuss: seems useless. If Pod has local storge, remove it from consideration. This filter iterates through
Pod.Spec.Volumes
slice and checks whetherVolume.EmptyDir
is non nil in order to determine - A Pod whose
Pod.Status.Phase
is PodSucceeded or PodFailed is eligible for deletion- If a Pod has a controller owner reference, it is eligible for deletion. (TODO: Unsure why this makes a difference anyways)
- The final pod filter
daemonsetFilter
seems useless. Discuss.
drain.Options.getPVList
NOTE: Should be called getPVNames
. Gets a slice of the persistent volume names bound to the Pod through its claims. Contains time.sleep and retry handling to a limit. Unsure if this is the best way. Discuss.
func (o *Options) getPVList(pod *corev1.Pod) (pvNames []string, err error)
- Iterate over
pod.Spec.Volumes
. - If
volume.PersistentVolumeClaim
reference is not nil, gets thePersistentVolumeClaim
usingo.pvcLister
usingvol.PersistentVolumeClaim.ClaimName
.- Implements error handling and retry till
GetPvDetailsMaxRetries
is reached with intervalGetPvDetailsRetryInterval
for the above.
- Implements error handling and retry till
- Adds
pvc.Spec.VolumeName
topvNames
- Return
pvNames
drain.Options.getVolIDsFromDriver
Given a slice of PV Names, this method gets the corresponding volume ids from the driver.
- It does this by first getting the PersistentVolumeSpec using
o.pvLister.Get(pvName)
for each PV name and adding to thepvSpecs
slice of typePersistentVolumeSpec
. See k8s.io/client-go/listers/core/v1.PersistentVolumeLister - Retry handling is implemented here while looking up pvName till
GetPvDetailsMaxRetries
is reached with sleep interval ofGetPvDetailsRetryInterval
between each retry attempt. - Once
pvSpecs
slice is populated it constructs a driver.GetVolumeIDsRequest from the same and then invokesdriver.GetVolumeIDs(driver.GetVolumeIDsRequest))
to obtain the driver.GetVolumeIDsResponse and retrunsdriver.GetVolumeIDsResponse.VolumeIDs
TODO: BUG ? In case the PV is not found or retry limit is reached the slice of volume ids will not have a 1:1 correspondence with slice of PV names passed in.
func (o *Options) getVolIDsFromDriver(ctx context.Context, pvNames []string) ([]string, error)
drain.Options.doAccountingOfPvs
drain.Options.doAccountingOfPvs returns a map of the pod key pod.Namespace + '/' + pod.Name
to a PodVolumeInfo struct which holds a slice of PV names and PV IDs.
NOTES:
- See filterSharedPVs
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TD Begin((" ")) -->Init[" podKey2VolNamesMap = make(map[string][]string) podKey2VolInfoMap = make(map[string]PodVolumeInfo) "] -->RangePods[" for pod := range pods "] -->PopPod2VolNames[" podKey2VolNamesMap[pod.Namespace + '/' pod.Name] = o.getPVList(pod) "] --loop-->RangePods PopPod2VolNames--done-->FilterSharedPVs[" filterSharedPVs(podKey2VolNamesMap) // filters out the PVs that are shared among pods. "] -->RangePodKey2VolNamesMap[" for podKey, volNames := range podKey2VolNamesMap "] -->GetVolumeIds[" volumeIds, err := o.getVolIDsFromDriver(ctx, volNames) if err != nil continue; //skip set of volumes "] -->InitPodVolInfo[" podVolumeInfo := PodVolumeInfo{ persistentVolumeList: volNames, volumeList: volumeIds } //struct field names are bad. "] -->PopPodVolInfoMap[" podVolumeInfoMap[podKey] = podVolumeInfo "] --loop-->RangePodKey2VolNamesMap PopPodVolInfoMap--done-->Return(("return podVolumeInfoMap"))
drain.filterPodsWithPv
NOTE: should have been named partitionPodsWithPVC
Utility function that iterates through given pods
and for each pod
, iterates through its pod.Spec.Volumes
. For each such pod volume
checks volume.PersistentVolumeClaim
. If not nil, adds pod
to slice podsWithPV
else adds pod
to slice podsWithoutPV
func filterPodsWithPv(pods []corev1.Pod)
(podsWithPV []*corev1.Pod, podsWithoutPV []*corev1.Pod)
drain.Options.waitForDetach
func (o *Options) waitForDetach(ctx context.Context,
podVolumeInfo PodVolumeInfo,
nodeName string) error
Summary:
- Initiaze boolean
found
to true. (Representing that a volume is still attached to a node). - Begins a loop while
found
is true - Uses a
select
and checks to see if a signal is received fromcontext.Done()
(ie context cancelled). If so, return an error with the message that a timeout occurred while waiting for PV's to be detached. - Sets
found
to false. - Gets the
Node
associated withnodeName
using thenodeLister
and assign tonode
. If there is an error return from the function. - Gets the
node.Status.VolumesAttached
which returns a []AttachedVolume and assign toattachedVols
.- Return
nil
if this slice is empty.
- Return
- Begin iteration
range podVolumeInfo.volumeList
assigningvolumeID
in parent iteration. Label this iteration asLookUpVolume
.- Begin inner iteration over
attachedVols
assigningattachedVol
in nested iteration - If
attachedVol.Name
is contained involumeID
then this volume is still attached to the node.- Set
found
to true - Sleep for
VolumeDetachPollInterval
seconds (5 seconds) - Break out of
LookUpVolume
- Set
- Begin inner iteration over
drain.Options.waitForReattach
Purpose: Waits for persistent volume names in podVolumeInfo.persistentVolumeList
to be re-attached (to another node).
But I am still confused on why we need to call this during the drain flow. Why should we wait for PVs to be attached to another node. After all, there is no guarantee they will be attached, right ?
func (o *Options) waitForReattach(ctx context.Context,
podVolumeInfo PodVolumeInfo,
previousNodeName string,
volumeAttachmentEventCh chan *VolumeAttachment) error
- Construct a map:
var pvsWaitingForReattachments map[string]bool
- Initiamize the above map by ranging through
podVolumeInfo.persistentVolumeList
taking thepersistentVolumeName
and setpvsWaitingForReattachments[persistentVolumeName] = true
- Start a
for
loop.- Commence a
select
with following cases:- Case: Check to see if context is closed/cancelled by reading:
<-ctx.Done()
. If so, return an error with the message that timeout occurred while waiting for PV's to reattach to another node. - Case: Obtain a *VolumeAttachment by reading from channel:
incomingEvent := <-volumeAttachmentEventCh
- Get the
persistentVolumeName
associated with this attachment event. persistentVolumeName := *incomingEvent.Spec.Source.PersistentVolumeName
- Check if this persistent volume was being tracked:
pvsWaitingForReattachments[persistentVolumeName]
is present - Check if the volume was attached to another node
incomingEvent.Status.Attached && incomingEvent.Spec.NodeName != previousNodeName
- If above is true, then delete entry corresponding to
persistentVolumeName
from thepvsWaitingForReattachments
map.
- Get the
- Case: Check to see if context is closed/cancelled by reading:
- if
pvsWaitingForReattachments
is empty break from the loop.
- Commence a
- Log that the volumes in
podVolumeInfo.persistentVolumeList
have been successfully re-attached and return nil.
drain.Options.waitForDelete
NOTE: Ideally should have been named waitForPodDisappearance
pkg/util/provider/drain.Options.waitForDelete is a helper method defined on drain.Options
that leverages wait.PollImmediate and the getPodFn
(get pod by name and namespace) and checks that all pods have disappeared within timeout
. The set of pods that did not disappear within timeout is returned as pendingPods
func (o *Options) waitForDelete(
pods []*corev1.Pod, interval,
timeout time.Duration,
getPodFn func(string, string) (*corev1.Pod, error)
) (pendingPods []*corev1.Pod, err error)
drain.Options.RunDrain
Context: drain.Options.RunDrain
is called from the MC helper method controller.drainNode
which in turn is called from controller.triggerDeletionFlow
when the machine.Status.LastOperation.Description
contains operation machineutils.InitiateDrain
.
If RunDrain
returns an error, then the drain is retried at a later time by putting back the machine key into the queue. Unless the force-deletion
label on the machine object is true - in which case we proceed to VM deletion.
func (o *Options) RunDrain(ctx context.Context) error
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TD GetNode["node, err = .client.CoreV1().Nodes().Get(ctx, o.nodeName, metav1.GetOptions{})"] -->ChkGetNodeErr{err != nil} ChkGetNodeErr--Yes-->ReturnNilEarly(("return nil (case where deletion triggered during machine creation, so node is nil. TODO: should use apierrors.NotFound)")) ChkGetNodeErr--No-->ChkNodeUnschedulable{node.Spec.Unschedulable?} ChkNodeUnschedulable--Yes-->GetPodsForDeletion[" pods, err := o.getPodsForDeletion(ctx) if err!=nil return err"] ChkNodeUnschedulable--No-->CloneNode["clone := node.DeepCopy() clone.Spec.Unschedulable = true"] -->UpdateNode["_, err = o.client.CoreV1().Nodes().Update(ctx, clone, metav1.UpdateOptions{}) if err != nil return err "] UpdateNode-->GetPodsForDeletion GetPodsForDeletion-->GetEvictionPGV[" policyGroupVersion, err := SupportEviction(o.client) if err != nil return err "] --> DefineAttemptEvict[" attemptEvict := !o.ForceDeletePods && len(policyGroupVersion) > 0 // useless boolean which confuses matters considerably. "] --> DefineGetPodFn[" getPodFn := func(namespace, name string) (*corev1.Pod, error) { return o.client.CoreV1().Pods(namespace).Get(ctx, name, metav1.GetOptions{}) }"] --> CreateReturnChannel[" returnCh := make(chan error, len(pods)) defer close(returnCh) "] -->ChkForceDelPods{"o.ForceDeletePods?"} ChkForceDelPods--Yes-->EvictPodsWithoutPV["go o.evictPodsWithoutPv(ctx, attemptEvict, pods, policyGroupVersion, getPodFn, returnCh) // go-routine feels un-necessary here."] ChkForceDelPods--No-->FilterPodsWithPv[" podsWithPv, podsWithoutPv := filterPodsWithPv(pods) "] FilterPodsWithPv-->EvictPodsWithPv[" go o.evictPodsWithPv(ctx, attemptEvict, podsWithPv, policyGroupVersion, getPodFn, returnCh) "] -->EvictPodsWithoutPV1[" go o.evictPodsWithoutPv(ctx, attemptEvict, podsWithoutPv, policyGroupVersion, getPodFn, returnCh) "]-->CreateAggregateError EvictPodsWithoutPV-->CreateAggregateError[" var errors []error errors = for i = 0; i < len(pods); ++i { err := <-returnCh if err != nil { errors = append(errors, err) } } "] -->ReturnAggError(("return\nerrors.NewAggregate(errors)"))
Notes:
- machine-controller-manager/pkg/util/provider/drain.SupportEviction uses Discovery API to find out if the server support eviction subresource and if so return its groupVersion or "" if it doesn't.
- k8s.io/kubectl/pkg/drain.CheckEvictionSupport already does this.
- attemptEvict boolean usage is confusing. Stick to
drain.Options.ForceDeletePods
- TODO: GAP? For cordoning a Node we currently just set
Node.Spec.Unschedulable
. But we are also supposed to set the taint.node.kubernetes.io/unschedulable
. The spec way is supposed to be deprecated.
drain.Options.evictPodsWithoutPv
drain method that iterates through each given pod and for each pod launches a go-routine that simply delegates to Options.evictPodsWithoutPv
.
func (o *Options) evictPodsWithoutPv(ctx context.Context,
pods []*corev1.Pod,
policyGroupVersion string, //eviction API's GV
getPodFn func(namespace, name string) (*corev1.Pod, error),
returnCh chan error) {
for _, pod := range pods {
go o.evictPodWithoutPVInternal(ctx, attemptEvict, pod, policyGroupVersion, getPodFn, returnCh)
}
return
}
drain.Options.evictPodWithoutPVInternal
drian method that that either evicts or deletes a Pod with retry handling until Options.MaxEvictRetries
is reached.
func (o *Options) evictPodWithoutPVInternal(
ctx context.Context,
attemptEvict bool,
pod *corev1.Pod,
policyGroupVersion string,
getPodFn func(namespace, name string) (*corev1.Pod, error),
returnCh chan error)
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TD RangePod["pod := range pods"] RangePod-->EvictOrDelPod["go evictPodWithoutPVInternal(attemptEvict bool, pod, policyGroupVersion,pod,getPodFn,returnCh)"] EvictOrDelPod-->Begin subgraph "evictPodWithoutPVInternal (evicts or deletes Pod) " Begin(("Begin"))-->SetRetry["retry := 0"] SetRetry -->SetAttemptEvict["if retry >= o.MaxEvictRetries {attemptEvict=false}"] -->ChkAttemptEvict{"attemptEvict ?"} ChkAttemptEvict--Yes-->EvictPod["err=o.evictPod(ctx, pod, policyGroupVersion)"] ChkAttemptEvict--No-->DelPod["err=o.deletePod(ctx, pod)"] EvictPod-->ChkErr DelPod-->ChkErr ChkErr{"Check err"} ChkErr--"Nil"-->ChkForceDelPods ChkErr--"IsTooManyRequests(err)"-->GetPdb[" // Possible case where Pod couldn't be evicted because of PDB violation pdbs, err = pdbLister.GetPodPodDisruptionBudgets(pod) pdb=pdbs[0] if err !=nil && len(pdbs) > 0 "] ChkErr--"IsNotFound(err)\n(pod evicted)"-->SendNilChannel-->NilReturn ChkErr--"OtherErr"-->SendErrChannel GetPdb-->ChkMisConfiguredPdb{"isMisconfiguredPdb(pdb)?"} ChkMisConfiguredPdb--Yes-->SetPdbError["err=fmt.Errorf('pdb misconfigured')"] SetPdbError-->SendErrChannel ChkMisConfiguredPdb--No-->SleepEvictRetryInterval["time.Sleep(PodEvictionRetryInterval)"] SleepEvictRetryInterval-->IncRetry["retry+=1"]-->SetAttemptEvict SendErrChannel-->NilReturn ChkForceDelPods{"o.ForceDeletePods"} ChkForceDelPods--"Yes\n(dont wait for\npod disappear)"-->SendNilChannel ChkForceDelPods--No-->GetPodTermGracePeriod[" // TODO: discuss this, shouldn't pod grace period override drain ? timeout=Min(pod.Spec.TerminationGracePeriodSeconds,o.Timeout) "] -->SetBufferPeriod["bufferPeriod := 30 * time.Second"] -->WaitForDelete["pendingPods=o.waitForDelete(pods, timeout,getPodFn)"] -->ChkWaitForDelError{err != nil ?} ChkWaitForDelError--Yes-->SendErrChannel ChkWaitForDelError--No-->ChkPendingPodsLength{"len(pendingPods) > 0?"} ChkPendingPodsLength--Yes-->SetTimeoutError["err = fmt.Errorf('pod term timeout')"] SetTimeoutError-->SendErrChannel ChkPendingPodsLength--No-->SendNilChannel end SendNilChannel["returnCh <- nil"] SendErrChannel["returnCh <- err"] NilReturn(("return"))
isMisconfiguredPdb
TODO: Discuss/Elaborate on why this is considered misconfigured.
func isMisconfiguredPdbV1(pdb *policyv1.PodDisruptionBudget) bool {
if pdb.ObjectMeta.Generation != pdb.Status.ObservedGeneration {
return false
}
return pdb.Status.ExpectedPods > 0 &&
pdb.Status.CurrentHealthy >= pdb.Status.ExpectedPods
&& pdb.Status.DisruptionsAllowed == 0
}
drain.Options.evictPodsWithPv
func (o *Options) evictPodsWithPv(ctx context.Context,
attemptEvict bool,
pods []*corev1.Pod,
policyGroupVersion string,
getPodFn func(namespace, name string) (*corev1.Pod, error),
returnCh chan error)
NOTE
- See drain.Options.evictPodsWithPv
- This method basically delegates to
o.evictPodsWithPVInternal
with retry handling - TODO: Logic of this method can do with some refactoring!
%%{init: {'themeVariables': { 'fontSize': '10px'}, "flowchart": {"useMaxWidth": false }}}%% flowchart TD Begin((" ")) -->SortPods[" sortPodsByPriority(pods) //Desc priority: pods[i].Spec.Priority > *pods[j].Spec.Priority "] -->DoVolumeAccounting[" podVolumeInfoMap := o.doAccountingOfPvs(ctx, pods) "] -->ChkAttemptEvict{attemptEvict ?} ChkAttemptEvict--Yes-->RetryTillLimit[" until MaxEvictRetries "] --> InvokeHelper[" remainingPods, aborted = o.evictPodsWithPVInternal(ctx, attemptEvict, pods, podVolumeInfoMap, policyGroupVersion, returnCh) "] InvokeHelper-->ChkAbort{" aborted || len(remainingPods) == 0 "} ChkAbort--Yes-->RangeRemainingPods ChkAbort--No-->Sleep[" pods = remainingPods time.Sleep(PodEvictionRetryInterval) "] Sleep--loop-->RetryTillLimit RetryTillLimit--loopend-->ChkRemaining{"len(remainingPods) > 0 && !aborted ?"} ChkRemaining--Yes-->InvokeHelper1[" // force delete pods remainingPods, _ = o.evictPodsWithPVInternal(ctx, false, pods, podVolumeInfoMap, policyGroupVersion, getPodFn, returnCh) "] ChkAttemptEvict--No-->InvokeHelper1 InvokeHelper1-->RangeRemainingPods ChkRemaining--No-->RangeRemainingPods RangeRemainingPods["pod := range remainingPods"] RangeRemainingPods--aborted?-->SendNil["returnCh <- nil"] RangeRemainingPods--attemptEvict?-->SendEvictErr["returnCh <- fmt.Errorf('pod evict error')"] RangeRemainingPods--else-->SendDelErr["returnCh <- fmt.Errorf('pod delete error')"] SendNil-->NilReturn SendEvictErr-->NilReturn SendDelErr-->NilReturn NilReturn(("return"))
drain.Options.evictPodsWithPVInternal
FIXME: name case inconsistency with evictPodsWithPv
drain.Options.evictPodsWithPVInternal is a drain helper method that actually evicts/deletes pods and waits for volume detachment. It returns a remainingPods
slice and a fastTrack
boolean is meant to abort the pod eviction and exit the calling go-routine. (TODO: should be called abort
or even better should use custom error here)
func (o *DrainOptions) evictPodsWithPVInternal(ctx context.Context,
attemptEvict bool,
pods []*corev1.Pod,
volMap map[string][]string,
policyGroupVersion string,
returnCh chan error
) (remainingPods []*api.Pod, fastTrack bool)
- Uses
context.Deadline
passing inctx
and a deadline time after the drain timeout to get a sub-context assigned tomainContext
and aCancelFunc
. Defer the obtainedCancelFunc
. (So it always invoked when the method terminates) - Maintain a pod slice
retryPods
which is initially empty. - Iterate through
pods
slice withi
as index variable- Apply a select with the one check case:
- Check to see if
mainContext
is closed/cancelled. Attempt to read from the Done channel:<-mainContext.Done()
. If this case matches:- Send
nil
on the return error channel:returnCh <- nil
- Compute
remainingPods
asretryPods
slice appended with pods yet to be iterated:pods[i+1:]...
- Return
remainingPods, true
. (aborted is true)
- Send
- Check to see if
- Initiate the pod eviction start time:
podEvictionStartTime=time.Now()
- Call
volumeAttachmentHandler.AddWorker()
to start trackingVolumeAttachments
and obtain avolumeAttachmentEventCh
receive channel that one can use to receive the attached or detached*.VolumeAttachment
. - If
attemptEvict
is true, then call evictPod else call deletePod helper method. Grab theerr
for eviction/deletion. - eviction/deletion had an error: Analyze the
err
:- If both
attemptEvict
is true andapierrors.IsTooManyRequests(err)
is true, then this case is interpreted as an eviction failure due to PDB violation.- We get the PodDisruptionBudget for the pod being iterated.
- We check whether it is misconfigured. IF So we send an error on
returnCh
and close thevolumeAttachmentEventCh
usingvolumeAttachmentHandler.DeleteWorker
and continue with next loop iteration. ie go to next pod.
- If just
apierrors.IsNotFound(err)
is true, this means that Pod is already gone from the node. We sendnil
onreturnCh
and callvolumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
and continue with next pod in iteration. - Otherwise we add the pod to the
retryPod
slice:retryPods = append(retryPods, pod)
, callvolumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
and continue with next pod in iteration. - (NOTE: Error handling can be optimized. too much repetition)
- If both
- Log that the evict/delete was successful.
- Get the PodVolumeInfo from
volMap
using the pod key. - Obtain a context and cancellation function for volume detachment using context.Timeout passing in the
mainContext
and detach timeout computed as the sum of the termination grace period of the pod (pod.Spec.TerminationGracePeriodSeconds
if not nil) added to thePvDetachTimeout
(from the drain options) - Invoke waitForDetach(ctx, podVolumeInfo, o.nodeName) and grab the
err
. - Invoke the cancel function for detach. NOTE: THIS IS NICHT GUT. The sub context should be created INSIDE waitForDetach with a defer for the cancelllation.
- Analyze the detachment error.
- If
apierrors.IsNotFound(err)
is true this indicates that the node is not found.- Send
nil
onreturnCh
- Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
- Compute
remainingPods
asretryPods
slice appended with pods yet to be iterated:pods[i+1:]...
- Return
remainingPods, true
. (aborted is true)
- Send
- For other errors:
- Send the
err
on thereturnCh
. - Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
- Continue with next pod in iteration.
- Send the
- If
- Obtain a context and cancellation function for volume re-attachment using context.Timeout passing in the
mainContext
anddrain.Options.PvReattachTimeout
. - Invoke waitForReattach(ctx, podVolumeInfo, o.nodeName, volumeAttachmentEventCh) and grab the returned
err
. - Invoke the cancel function for reattach. NOTE: THIS IS NICHT GUT. The sub context should be created INSIDE
waitForReattach
. - Analyze the re-attachment error.
- If err is a reattachment timeout error just log a warning. TODO: Confused on why we don't return an error on the return channel here.
- Otherwise we Send the
err
on thereturnCh
. - Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
and continue with next pod in iteration
- YAWN. Someone is very fond of calling this again and again. Call
volumeAttachmentHandler.DeleteWorker(volumeAttachmentEventCh)
- Log the time taken for pod eviction+vol detachment+vol attachment to another node using
time.Since(podEvictionStartTime)
. - Send
nil
onreturnCh
- Apply a select with the one check case:
- pod iteration loop is done:
return retryPods, false