mirror of
https://github.com/TECHNOFAB11/zfs-localpv.git
synced 2025-12-13 23:03:57 +01:00
refact(deps): bump k8s and client-go deps to version v0.20.2 (#294)
Signed-off-by: prateekpandey14 <prateek.pandey@mayadata.io>
This commit is contained in:
parent
533e17a9aa
commit
b1aa6ab51a
2196 changed files with 306727 additions and 251810 deletions
248
vendor/k8s.io/client-go/tools/cache/delta_fifo.go
generated
vendored
248
vendor/k8s.io/client-go/tools/cache/delta_fifo.go
generated
vendored
|
|
@ -23,24 +23,25 @@ import (
|
|||
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
|
||||
"k8s.io/klog"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// NewDeltaFIFO returns a Store which can be used process changes to items.
|
||||
// NewDeltaFIFO returns a Queue which can be used to process changes to items.
|
||||
//
|
||||
// keyFunc is used to figure out what key an object should have. (It's
|
||||
// exposed in the returned DeltaFIFO's KeyOf() method, with bonus features.)
|
||||
// keyFunc is used to figure out what key an object should have. (It is
|
||||
// exposed in the returned DeltaFIFO's KeyOf() method, with additional handling
|
||||
// around deleted objects and queue state).
|
||||
//
|
||||
// 'knownObjects' may be supplied to modify the behavior of Delete,
|
||||
// Replace, and Resync. It may be nil if you do not need those
|
||||
// modifications.
|
||||
//
|
||||
// 'keyLister' is expected to return a list of keys that the consumer of
|
||||
// this queue "knows about". It is used to decide which items are missing
|
||||
// when Replace() is called; 'Deleted' deltas are produced for these items.
|
||||
// It may be nil if you don't need to detect all deletions.
|
||||
// TODO: consider merging keyLister with this object, tracking a list of
|
||||
// "known" keys when Pop() is called. Have to think about how that
|
||||
// affects error retrying.
|
||||
// NOTE: It is possible to misuse this and cause a race when using an
|
||||
// external known object source.
|
||||
// Whether there is a potential race depends on how the comsumer
|
||||
// Whether there is a potential race depends on how the consumer
|
||||
// modifies knownObjects. In Pop(), process function is called under
|
||||
// lock, so it is safe to update data structures in it that need to be
|
||||
// in sync with the queue (e.g. knownObjects).
|
||||
|
|
@ -56,18 +57,79 @@ import (
|
|||
// and internal tests.
|
||||
//
|
||||
// Also see the comment on DeltaFIFO.
|
||||
//
|
||||
// Warning: This constructs a DeltaFIFO that does not differentiate between
|
||||
// events caused by a call to Replace (e.g., from a relist, which may
|
||||
// contain object updates), and synthetic events caused by a periodic resync
|
||||
// (which just emit the existing object). See https://issue.k8s.io/86015 for details.
|
||||
//
|
||||
// Use `NewDeltaFIFOWithOptions(DeltaFIFOOptions{..., EmitDeltaTypeReplaced: true})`
|
||||
// instead to receive a `Replaced` event depending on the type.
|
||||
//
|
||||
// Deprecated: Equivalent to NewDeltaFIFOWithOptions(DeltaFIFOOptions{KeyFunction: keyFunc, KnownObjects: knownObjects})
|
||||
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
|
||||
return NewDeltaFIFOWithOptions(DeltaFIFOOptions{
|
||||
KeyFunction: keyFunc,
|
||||
KnownObjects: knownObjects,
|
||||
})
|
||||
}
|
||||
|
||||
// DeltaFIFOOptions is the configuration parameters for DeltaFIFO. All are
|
||||
// optional.
|
||||
type DeltaFIFOOptions struct {
|
||||
|
||||
// KeyFunction is used to figure out what key an object should have. (It's
|
||||
// exposed in the returned DeltaFIFO's KeyOf() method, with additional
|
||||
// handling around deleted objects and queue state).
|
||||
// Optional, the default is MetaNamespaceKeyFunc.
|
||||
KeyFunction KeyFunc
|
||||
|
||||
// KnownObjects is expected to return a list of keys that the consumer of
|
||||
// this queue "knows about". It is used to decide which items are missing
|
||||
// when Replace() is called; 'Deleted' deltas are produced for the missing items.
|
||||
// KnownObjects may be nil if you can tolerate missing deletions on Replace().
|
||||
KnownObjects KeyListerGetter
|
||||
|
||||
// EmitDeltaTypeReplaced indicates that the queue consumer
|
||||
// understands the Replaced DeltaType. Before the `Replaced` event type was
|
||||
// added, calls to Replace() were handled the same as Sync(). For
|
||||
// backwards-compatibility purposes, this is false by default.
|
||||
// When true, `Replaced` events will be sent for items passed to a Replace() call.
|
||||
// When false, `Sync` events will be sent instead.
|
||||
EmitDeltaTypeReplaced bool
|
||||
}
|
||||
|
||||
// NewDeltaFIFOWithOptions returns a Queue which can be used to process changes to
|
||||
// items. See also the comment on DeltaFIFO.
|
||||
func NewDeltaFIFOWithOptions(opts DeltaFIFOOptions) *DeltaFIFO {
|
||||
if opts.KeyFunction == nil {
|
||||
opts.KeyFunction = MetaNamespaceKeyFunc
|
||||
}
|
||||
|
||||
f := &DeltaFIFO{
|
||||
items: map[string]Deltas{},
|
||||
queue: []string{},
|
||||
keyFunc: keyFunc,
|
||||
knownObjects: knownObjects,
|
||||
keyFunc: opts.KeyFunction,
|
||||
knownObjects: opts.KnownObjects,
|
||||
|
||||
emitDeltaTypeReplaced: opts.EmitDeltaTypeReplaced,
|
||||
}
|
||||
f.cond.L = &f.lock
|
||||
return f
|
||||
}
|
||||
|
||||
// DeltaFIFO is like FIFO, but allows you to process deletes.
|
||||
// DeltaFIFO is like FIFO, but differs in two ways. One is that the
|
||||
// accumulator associated with a given object's key is not that object
|
||||
// but rather a Deltas, which is a slice of Delta values for that
|
||||
// object. Applying an object to a Deltas means to append a Delta
|
||||
// except when the potentially appended Delta is a Deleted and the
|
||||
// Deltas already ends with a Deleted. In that case the Deltas does
|
||||
// not grow, although the terminal Deleted will be replaced by the new
|
||||
// Deleted if the older Deleted's object is a
|
||||
// DeletedFinalStateUnknown.
|
||||
//
|
||||
// The other difference is that DeltaFIFO has an additional way that
|
||||
// an object can be applied to an accumulator, called Sync.
|
||||
//
|
||||
// DeltaFIFO is a producer-consumer queue, where a Reflector is
|
||||
// intended to be the producer, and the consumer is whatever calls
|
||||
|
|
@ -77,35 +139,39 @@ func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
|
|||
// * You want to process every object change (delta) at most once.
|
||||
// * When you process an object, you want to see everything
|
||||
// that's happened to it since you last processed it.
|
||||
// * You want to process the deletion of objects.
|
||||
// * You want to process the deletion of some of the objects.
|
||||
// * You might want to periodically reprocess objects.
|
||||
//
|
||||
// DeltaFIFO's Pop(), Get(), and GetByKey() methods return
|
||||
// interface{} to satisfy the Store/Queue interfaces, but it
|
||||
// will always return an object of type Deltas.
|
||||
// interface{} to satisfy the Store/Queue interfaces, but they
|
||||
// will always return an object of type Deltas. List() returns
|
||||
// the newest object from each accumulator in the FIFO.
|
||||
//
|
||||
// A DeltaFIFO's knownObjects KeyListerGetter provides the abilities
|
||||
// to list Store keys and to get objects by Store key. The objects in
|
||||
// question are called "known objects" and this set of objects
|
||||
// modifies the behavior of the Delete, Replace, and Resync methods
|
||||
// (each in a different way).
|
||||
//
|
||||
// A note on threading: If you call Pop() in parallel from multiple
|
||||
// threads, you could end up with multiple threads processing slightly
|
||||
// different versions of the same object.
|
||||
//
|
||||
// A note on the KeyLister used by the DeltaFIFO: It's main purpose is
|
||||
// to list keys that are "known", for the purpose of figuring out which
|
||||
// items have been deleted when Replace() or Delete() are called. The deleted
|
||||
// object will be included in the DeleteFinalStateUnknown markers. These objects
|
||||
// could be stale.
|
||||
type DeltaFIFO struct {
|
||||
// lock/cond protects access to 'items' and 'queue'.
|
||||
lock sync.RWMutex
|
||||
cond sync.Cond
|
||||
|
||||
// We depend on the property that items in the set are in
|
||||
// the queue and vice versa, and that all Deltas in this
|
||||
// map have at least one Delta.
|
||||
// `items` maps a key to a Deltas.
|
||||
// Each such Deltas has at least one Delta.
|
||||
items map[string]Deltas
|
||||
|
||||
// `queue` maintains FIFO order of keys for consumption in Pop().
|
||||
// There are no duplicates in `queue`.
|
||||
// A key is in `queue` if and only if it is in `items`.
|
||||
queue []string
|
||||
|
||||
// populated is true if the first batch of items inserted by Replace() has been populated
|
||||
// or Delete/Add/Update was called first.
|
||||
// or Delete/Add/Update/AddIfNotPresent was called first.
|
||||
populated bool
|
||||
// initialPopulationCount is the number of items inserted by the first call of Replace()
|
||||
initialPopulationCount int
|
||||
|
|
@ -114,16 +180,17 @@ type DeltaFIFO struct {
|
|||
// insertion and retrieval, and should be deterministic.
|
||||
keyFunc KeyFunc
|
||||
|
||||
// knownObjects list keys that are "known", for the
|
||||
// purpose of figuring out which items have been deleted
|
||||
// when Replace() or Delete() is called.
|
||||
// knownObjects list keys that are "known" --- affecting Delete(),
|
||||
// Replace(), and Resync()
|
||||
knownObjects KeyListerGetter
|
||||
|
||||
// Indication the queue is closed.
|
||||
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
|
||||
// Currently, not used to gate any of CRED operations.
|
||||
closed bool
|
||||
closedLock sync.Mutex
|
||||
closed bool
|
||||
|
||||
// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
|
||||
// DeltaType when Replace() is called (to preserve backwards compat).
|
||||
emitDeltaTypeReplaced bool
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
@ -139,8 +206,8 @@ var (
|
|||
|
||||
// Close the queue.
|
||||
func (f *DeltaFIFO) Close() {
|
||||
f.closedLock.Lock()
|
||||
defer f.closedLock.Unlock()
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.closed = true
|
||||
f.cond.Broadcast()
|
||||
}
|
||||
|
|
@ -160,8 +227,8 @@ func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
|
|||
return f.keyFunc(obj)
|
||||
}
|
||||
|
||||
// Return true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// or an Update called first but the first batch of items inserted by Replace() has been popped
|
||||
// HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
|
||||
// or the first batch of items inserted by Replace() has been popped.
|
||||
func (f *DeltaFIFO) HasSynced() bool {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
|
@ -185,9 +252,11 @@ func (f *DeltaFIFO) Update(obj interface{}) error {
|
|||
return f.queueActionLocked(Updated, obj)
|
||||
}
|
||||
|
||||
// Delete is just like Add, but makes an Deleted Delta. If the item does not
|
||||
// already exist, it will be ignored. (It may have already been deleted by a
|
||||
// Replace (re-list), for example.
|
||||
// Delete is just like Add, but makes a Deleted Delta. If the given
|
||||
// object does not already exist, it will be ignored. (It may have
|
||||
// already been deleted by a Replace (re-list), for example.) In this
|
||||
// method `f.knownObjects`, if not nil, provides (via GetByKey)
|
||||
// _additional_ objects that are considered to already exist.
|
||||
func (f *DeltaFIFO) Delete(obj interface{}) error {
|
||||
id, err := f.KeyOf(obj)
|
||||
if err != nil {
|
||||
|
|
@ -216,6 +285,7 @@ func (f *DeltaFIFO) Delete(obj interface{}) error {
|
|||
}
|
||||
}
|
||||
|
||||
// exist in items and/or KnownObjects
|
||||
return f.queueActionLocked(Deleted, obj)
|
||||
}
|
||||
|
||||
|
|
@ -266,6 +336,11 @@ func dedupDeltas(deltas Deltas) Deltas {
|
|||
a := &deltas[n-1]
|
||||
b := &deltas[n-2]
|
||||
if out := isDup(a, b); out != nil {
|
||||
// `a` and `b` are duplicates. Only keep the one returned from isDup().
|
||||
// TODO: This extra array allocation and copy seems unnecessary if
|
||||
// all we do to dedup is compare the new delta with the last element
|
||||
// in `items`, which could be done by mutating `items` directly.
|
||||
// Might be worth profiling and investigating if it is safe to optimize.
|
||||
d := append(Deltas{}, deltas[:n-2]...)
|
||||
return append(d, *out)
|
||||
}
|
||||
|
|
@ -302,8 +377,8 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||
if err != nil {
|
||||
return KeyError{obj, err}
|
||||
}
|
||||
|
||||
newDeltas := append(f.items[id], Delta{actionType, obj})
|
||||
oldDeltas := f.items[id]
|
||||
newDeltas := append(oldDeltas, Delta{actionType, obj})
|
||||
newDeltas = dedupDeltas(newDeltas)
|
||||
|
||||
if len(newDeltas) > 0 {
|
||||
|
|
@ -313,9 +388,16 @@ func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) err
|
|||
f.items[id] = newDeltas
|
||||
f.cond.Broadcast()
|
||||
} else {
|
||||
// We need to remove this from our map (extra items in the queue are
|
||||
// ignored if they are not in the map).
|
||||
delete(f.items, id)
|
||||
// This never happens, because dedupDeltas never returns an empty list
|
||||
// when given a non-empty list (as it is here).
|
||||
// If somehow it happens anyway, deal with it but complain.
|
||||
if oldDeltas == nil {
|
||||
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; ignoring", id, oldDeltas, obj)
|
||||
return nil
|
||||
}
|
||||
klog.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; breaking invariant by storing empty Deltas", id, oldDeltas, obj)
|
||||
f.items[id] = newDeltas
|
||||
return fmt.Errorf("Impossible dedupDeltas for id=%q: oldDeltas=%#+v, obj=%#+v; broke DeltaFIFO invariant by storing empty Deltas", id, oldDeltas, obj)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -375,22 +457,24 @@ func (f *DeltaFIFO) GetByKey(key string) (item interface{}, exists bool, err err
|
|||
return d, exists, nil
|
||||
}
|
||||
|
||||
// Checks if the queue is closed
|
||||
// IsClosed checks if the queue is closed
|
||||
func (f *DeltaFIFO) IsClosed() bool {
|
||||
f.closedLock.Lock()
|
||||
defer f.closedLock.Unlock()
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
return f.closed
|
||||
}
|
||||
|
||||
// Pop blocks until an item is added to the queue, and then returns it. If
|
||||
// Pop blocks until the queue has some items, and then returns one. If
|
||||
// multiple items are ready, they are returned in the order in which they were
|
||||
// added/updated. The item is removed from the queue (and the store) before it
|
||||
// is returned, so if you don't successfully process it, you need to add it back
|
||||
// with AddIfNotPresent().
|
||||
// process function is called under lock, so it is safe update data structures
|
||||
// process function is called under lock, so it is safe to update data structures
|
||||
// in it that need to be in sync with the queue (e.g. knownKeys). The PopProcessFunc
|
||||
// may return an instance of ErrRequeue with a nested error to indicate the current
|
||||
// item should be requeued (equivalent to calling AddIfNotPresent under the lock).
|
||||
// process should avoid expensive I/O operation so that other queue operations, i.e.
|
||||
// Add() and Get(), won't be blocked for too long.
|
||||
//
|
||||
// Pop returns a 'Deltas', which has a complete list of all the things
|
||||
// that happened to the object (deltas) while it was sitting in the queue.
|
||||
|
|
@ -402,8 +486,8 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
|||
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
|
||||
// When Close() is called, the f.closed is set and the condition is broadcasted.
|
||||
// Which causes this loop to continue and return from the Pop().
|
||||
if f.IsClosed() {
|
||||
return nil, FIFOClosedError
|
||||
if f.closed {
|
||||
return nil, ErrFIFOClosed
|
||||
}
|
||||
|
||||
f.cond.Wait()
|
||||
|
|
@ -415,7 +499,8 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
|||
}
|
||||
item, ok := f.items[id]
|
||||
if !ok {
|
||||
// Item may have been deleted subsequently.
|
||||
// This should never happen
|
||||
klog.Errorf("Inconceivable! %q was in f.queue but not f.items; ignoring.", id)
|
||||
continue
|
||||
}
|
||||
delete(f.items, id)
|
||||
|
|
@ -430,22 +515,35 @@ func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
|
|||
}
|
||||
}
|
||||
|
||||
// Replace will delete the contents of 'f', using instead the given map.
|
||||
// 'f' takes ownership of the map, you should not reference the map again
|
||||
// after calling this function. f's queue is reset, too; upon return, it
|
||||
// will contain the items in the map, in no particular order.
|
||||
// Replace atomically does two things: (1) it adds the given objects
|
||||
// using the Sync or Replace DeltaType and then (2) it does some deletions.
|
||||
// In particular: for every pre-existing key K that is not the key of
|
||||
// an object in `list` there is the effect of
|
||||
// `Delete(DeletedFinalStateUnknown{K, O})` where O is current object
|
||||
// of K. If `f.knownObjects == nil` then the pre-existing keys are
|
||||
// those in `f.items` and the current object of K is the `.Newest()`
|
||||
// of the Deltas associated with K. Otherwise the pre-existing keys
|
||||
// are those listed by `f.knownObjects` and the current object of K is
|
||||
// what `f.knownObjects.GetByKey(K)` returns.
|
||||
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
keys := make(sets.String, len(list))
|
||||
|
||||
// keep backwards compat for old clients
|
||||
action := Sync
|
||||
if f.emitDeltaTypeReplaced {
|
||||
action = Replaced
|
||||
}
|
||||
|
||||
// Add Sync/Replaced action for each new item.
|
||||
for _, item := range list {
|
||||
key, err := f.KeyOf(item)
|
||||
if err != nil {
|
||||
return KeyError{item, err}
|
||||
}
|
||||
keys.Insert(key)
|
||||
if err := f.queueActionLocked(Sync, item); err != nil {
|
||||
if err := f.queueActionLocked(action, item); err != nil {
|
||||
return fmt.Errorf("couldn't enqueue object: %v", err)
|
||||
}
|
||||
}
|
||||
|
|
@ -457,6 +555,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
|||
if keys.Has(k) {
|
||||
continue
|
||||
}
|
||||
// Delete pre-existing items not in the new list.
|
||||
// This could happen if watch deletion event was missed while
|
||||
// disconnected from apiserver.
|
||||
var deletedObj interface{}
|
||||
if n := oldItem.Newest(); n != nil {
|
||||
deletedObj = n.Object
|
||||
|
|
@ -507,7 +608,9 @@ func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Resync will send a sync event for each item
|
||||
// Resync adds, with a Sync type of Delta, every object listed by
|
||||
// `f.knownObjects` whose key is not already queued for processing.
|
||||
// If `f.knownObjects` is `nil` then Resync does nothing.
|
||||
func (f *DeltaFIFO) Resync() error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
|
@ -525,13 +628,6 @@ func (f *DeltaFIFO) Resync() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *DeltaFIFO) syncKey(key string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
|
||||
return f.syncKeyLocked(key)
|
||||
}
|
||||
|
||||
func (f *DeltaFIFO) syncKeyLocked(key string) error {
|
||||
obj, exists, err := f.knownObjects.GetByKey(key)
|
||||
if err != nil {
|
||||
|
|
@ -573,20 +669,26 @@ type KeyLister interface {
|
|||
|
||||
// A KeyGetter is anything that knows how to get the value stored under a given key.
|
||||
type KeyGetter interface {
|
||||
GetByKey(key string) (interface{}, bool, error)
|
||||
// GetByKey returns the value associated with the key, or sets exists=false.
|
||||
GetByKey(key string) (value interface{}, exists bool, err error)
|
||||
}
|
||||
|
||||
// DeltaType is the type of a change (addition, deletion, etc)
|
||||
type DeltaType string
|
||||
|
||||
// Change type definition
|
||||
const (
|
||||
Added DeltaType = "Added"
|
||||
Updated DeltaType = "Updated"
|
||||
Deleted DeltaType = "Deleted"
|
||||
// The other types are obvious. You'll get Sync deltas when:
|
||||
// * A watch expires/errors out and a new list/watch cycle is started.
|
||||
// * You've turned on periodic syncs.
|
||||
// (Anything that trigger's DeltaFIFO's Replace() method.)
|
||||
// Replaced is emitted when we encountered watch errors and had to do a
|
||||
// relist. We don't know if the replaced object has changed.
|
||||
//
|
||||
// NOTE: Previous versions of DeltaFIFO would use Sync for Replace events
|
||||
// as well. Hence, Replaced is only emitted when the option
|
||||
// EmitDeltaTypeReplaced is true.
|
||||
Replaced DeltaType = "Replaced"
|
||||
// Sync is for synthetic events during a periodic resync.
|
||||
Sync DeltaType = "Sync"
|
||||
)
|
||||
|
||||
|
|
@ -631,10 +733,10 @@ func copyDeltas(d Deltas) Deltas {
|
|||
return d2
|
||||
}
|
||||
|
||||
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where
|
||||
// an object was deleted but the watch deletion event was missed. In this
|
||||
// case we don't know the final "resting" state of the object, so there's
|
||||
// a chance the included `Obj` is stale.
|
||||
// DeletedFinalStateUnknown is placed into a DeltaFIFO in the case where an object
|
||||
// was deleted but the watch deletion event was missed while disconnected from
|
||||
// apiserver. In this case we don't know the final "resting" state of the object, so
|
||||
// there's a chance the included `Obj` is stale.
|
||||
type DeletedFinalStateUnknown struct {
|
||||
Key string
|
||||
Obj interface{}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue