diff --git a/buildscripts/generate-manifests.sh b/buildscripts/generate-manifests.sh index 3a242b2..3894a13 100755 --- a/buildscripts/generate-manifests.sh +++ b/buildscripts/generate-manifests.sh @@ -92,6 +92,20 @@ echo ' cat deploy/yamls/zfs.openebs.io_zfsrestores.yaml >> deploy/yamls/zfsrestore-crd.yaml rm deploy/yamls/zfs.openebs.io_zfsrestores.yaml +echo ' +############################################## +########### ############ +########### ZFSNode CRD ############ +########### ############ +############################################## + +# ZFSNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition' > deploy/yamls/zfsnode-crd.yaml + +cat deploy/yamls/zfs.openebs.io_zfsnodes.yaml >> deploy/yamls/zfsnode-crd.yaml +rm deploy/yamls/zfs.openebs.io_zfsnodes.yaml + ## create the operator file using all the yamls echo '# This manifest is autogenerated via `make manifests` command @@ -117,6 +131,9 @@ cat deploy/yamls/zfsbackup-crd.yaml >> deploy/zfs-operator.yaml # Add ZFSRestore v1 CRDs to the Operator yaml cat deploy/yamls/zfsrestore-crd.yaml >> deploy/zfs-operator.yaml +# Add ZFSNode v1alpha1 CRDs to the Operator yaml +cat deploy/yamls/zfsnode-crd.yaml >> deploy/zfs-operator.yaml + # Add the driver deployment to the Operator yaml cat deploy/yamls/zfs-driver.yaml >> deploy/zfs-operator.yaml diff --git a/deploy/yamls/zfs-driver.yaml b/deploy/yamls/zfs-driver.yaml index b0f04af..e4384f1 100644 --- a/deploy/yamls/zfs-driver.yaml +++ b/deploy/yamls/zfs-driver.yaml @@ -677,6 +677,9 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["storageclasses", "csinodes"] verbs: ["get", "list", "watch"] + - apiGroups: [ "storage.k8s.io" ] + resources: [ "csistoragecapacities"] + verbs: ["*"] - apiGroups: [""] resources: ["events"] verbs: ["list", "watch", "create", "update", "patch"] @@ -690,7 +693,7 @@ rules: resources: ["pods"] verbs: ["get", "list", "watch", "update", "patch"] - apiGroups: ["*"] - resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores"] + resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores", "zfsnodes"] verbs: ["*"] --- @@ -897,7 +900,7 @@ rules: resources: ["persistentvolumes", "nodes", "services"] verbs: ["get", "list"] - apiGroups: ["*"] - resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores"] + resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores", "zfsnodes"] verbs: ["get", "list", "watch", "create", "update", "patch"] --- diff --git a/deploy/yamls/zfsnode-crd.yaml b/deploy/yamls/zfsnode-crd.yaml new file mode 100644 index 0000000..26ad2c5 --- /dev/null +++ b/deploy/yamls/zfsnode-crd.yaml @@ -0,0 +1,87 @@ + +############################################## +########### ############ +########### ZFSNode CRD ############ +########### ############ +############################################## + +# ZFSNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.4.0 + creationTimestamp: null + name: zfsnodes.zfs.openebs.io +spec: + group: zfs.openebs.io + names: + kind: ZFSNode + listKind: ZFSNodeList + plural: zfsnodes + shortNames: + - zfsnode + singular: zfsnode + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + description: ZFSNode records information about all zfs pools available in + a node. In general, the openebs node-agent creates the ZFSNode object & + periodically synchronizing the zfs pools available in the node. ZFSNode + has an owner reference pointing to the corresponding node object. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + pools: + items: + description: Pool specifies attributes of a given zfs pool exists on + node. + properties: + free: + anyOf: + - type: integer + - type: string + description: Free specifies the available capacity of zfs pool. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + name: + description: Name of the zfs zfs pool. + minLength: 1 + type: string + uuid: + description: UUID denotes a unique identity of a zfs pool. + minLength: 1 + type: string + required: + - free + - name + - uuid + type: object + type: array + required: + - pools + type: object + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] diff --git a/deploy/zfs-operator.yaml b/deploy/zfs-operator.yaml index 59ff609..5a96733 100644 --- a/deploy/zfs-operator.yaml +++ b/deploy/zfs-operator.yaml @@ -1206,6 +1206,93 @@ status: conditions: [] storedVersions: [] +############################################## +########### ############ +########### ZFSNode CRD ############ +########### ############ +############################################## + +# ZFSNode CRD is autogenerated via `make manifests` command. +# Do the modification in the code and run the `make manifests` command +# to generate the CRD definition + +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + annotations: + controller-gen.kubebuilder.io/version: v0.4.0 + creationTimestamp: null + name: zfsnodes.zfs.openebs.io +spec: + group: zfs.openebs.io + names: + kind: ZFSNode + listKind: ZFSNodeList + plural: zfsnodes + shortNames: + - zfsnode + singular: zfsnode + scope: Namespaced + versions: + - name: v1 + schema: + openAPIV3Schema: + description: ZFSNode records information about all zfs pools available in + a node. In general, the openebs node-agent creates the ZFSNode object & + periodically synchronizing the zfs pools available in the node. ZFSNode + has an owner reference pointing to the corresponding node object. + properties: + apiVersion: + description: 'APIVersion defines the versioned schema of this representation + of an object. Servers should convert recognized schemas to the latest + internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#resources' + type: string + kind: + description: 'Kind is a string value representing the REST resource this + object represents. Servers may infer this from the endpoint the client + submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#types-kinds' + type: string + metadata: + type: object + pools: + items: + description: Pool specifies attributes of a given zfs pool exists on + node. + properties: + free: + anyOf: + - type: integer + - type: string + description: Free specifies the available capacity of zfs pool. + pattern: ^(\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))(([KMGTPE]i)|[numkMGTPE]|([eE](\+|-)?(([0-9]+(\.[0-9]*)?)|(\.[0-9]+))))?$ + x-kubernetes-int-or-string: true + name: + description: Name of the zfs zfs pool. + minLength: 1 + type: string + uuid: + description: UUID denotes a unique identity of a zfs pool. + minLength: 1 + type: string + required: + - free + - name + - uuid + type: object + type: array + required: + - pools + type: object + served: true + storage: true +status: + acceptedNames: + kind: "" + plural: "" + conditions: [] + storedVersions: [] + --- # Create the CSI Driver object @@ -1884,6 +1971,9 @@ rules: - apiGroups: ["storage.k8s.io"] resources: ["storageclasses", "csinodes"] verbs: ["get", "list", "watch"] + - apiGroups: [ "storage.k8s.io" ] + resources: [ "csistoragecapacities"] + verbs: ["*"] - apiGroups: [""] resources: ["events"] verbs: ["list", "watch", "create", "update", "patch"] @@ -1897,7 +1987,7 @@ rules: resources: ["pods"] verbs: ["get", "list", "watch", "update", "patch"] - apiGroups: ["*"] - resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores"] + resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores", "zfsnodes"] verbs: ["*"] --- @@ -2104,7 +2194,7 @@ rules: resources: ["persistentvolumes", "nodes", "services"] verbs: ["get", "list"] - apiGroups: ["*"] - resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores"] + resources: ["zfsvolumes", "zfssnapshots", "zfsbackups", "zfsrestores", "zfsnodes"] verbs: ["get", "list", "watch", "create", "update", "patch"] --- diff --git a/pkg/apis/openebs.io/zfs/v1/register.go b/pkg/apis/openebs.io/zfs/v1/register.go index 544b4d0..795a85d 100644 --- a/pkg/apis/openebs.io/zfs/v1/register.go +++ b/pkg/apis/openebs.io/zfs/v1/register.go @@ -77,6 +77,8 @@ func addKnownTypes(scheme *runtime.Scheme) error { &ZFSBackupList{}, &ZFSRestore{}, &ZFSRestoreList{}, + &ZFSNode{}, + &ZFSNodeList{}, ) metav1.AddToGroupVersion(scheme, SchemeGroupVersion) return nil diff --git a/pkg/apis/openebs.io/zfs/v1/zfsnode.go b/pkg/apis/openebs.io/zfs/v1/zfsnode.go new file mode 100644 index 0000000..75a5ed8 --- /dev/null +++ b/pkg/apis/openebs.io/zfs/v1/zfsnode.go @@ -0,0 +1,66 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package v1 + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// +genclient +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=zfsnode + +// ZFSNode records information about all zfs pools available +// in a node. In general, the openebs node-agent creates the ZFSNode +// object & periodically synchronizing the zfs pools available in the node. +// ZFSNode has an owner reference pointing to the corresponding node object. +// +kubebuilder:object:root=true +// +kubebuilder:resource:scope=Namespaced,shortName=zfsnode +type ZFSNode struct { + metav1.TypeMeta `json:",inline"` + metav1.ObjectMeta `json:"metadata,omitempty"` + + Pools []Pool `json:"pools"` +} + +// Pool specifies attributes of a given zfs pool that exists on the node. +type Pool struct { + // Name of the zfs pool. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + Name string `json:"name"` + + // UUID denotes a unique identity of a zfs pool. + // +kubebuilder:validation:Required + // +kubebuilder:validation:MinLength=1 + UUID string `json:"uuid"` + + // Free specifies the available capacity of zfs pool. + // +kubebuilder:validation:Required + Free resource.Quantity `json:"free"` +} + +// ZFSNodeList is a collection of ZFSNode resources +// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object +// +resource:path=zfsnodes +type ZFSNodeList struct { + metav1.TypeMeta `json:",inline"` + metav1.ListMeta `json:"metadata"` + + Items []ZFSNode `json:"items"` +} diff --git a/pkg/apis/openebs.io/zfs/v1/zz_generated.deepcopy.go b/pkg/apis/openebs.io/zfs/v1/zz_generated.deepcopy.go index e16e707..28b86af 100644 --- a/pkg/apis/openebs.io/zfs/v1/zz_generated.deepcopy.go +++ b/pkg/apis/openebs.io/zfs/v1/zz_generated.deepcopy.go @@ -24,6 +24,23 @@ import ( runtime "k8s.io/apimachinery/pkg/runtime" ) +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *Pool) DeepCopyInto(out *Pool) { + *out = *in + out.Free = in.Free.DeepCopy() + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new Pool. +func (in *Pool) DeepCopy() *Pool { + if in == nil { + return nil + } + out := new(Pool) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *SnapStatus) DeepCopyInto(out *SnapStatus) { *out = *in @@ -148,6 +165,72 @@ func (in *ZFSBackupSpec) DeepCopy() *ZFSBackupSpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ZFSNode) DeepCopyInto(out *ZFSNode) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ObjectMeta.DeepCopyInto(&out.ObjectMeta) + if in.Pools != nil { + in, out := &in.Pools, &out.Pools + *out = make([]Pool, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ZFSNode. +func (in *ZFSNode) DeepCopy() *ZFSNode { + if in == nil { + return nil + } + out := new(ZFSNode) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ZFSNode) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *ZFSNodeList) DeepCopyInto(out *ZFSNodeList) { + *out = *in + out.TypeMeta = in.TypeMeta + in.ListMeta.DeepCopyInto(&out.ListMeta) + if in.Items != nil { + in, out := &in.Items, &out.Items + *out = make([]ZFSNode, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } + return +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ZFSNodeList. +func (in *ZFSNodeList) DeepCopy() *ZFSNodeList { + if in == nil { + return nil + } + out := new(ZFSNodeList) + in.DeepCopyInto(out) + return out +} + +// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object. +func (in *ZFSNodeList) DeepCopyObject() runtime.Object { + if c := in.DeepCopy(); c != nil { + return c + } + return nil +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *ZFSRestore) DeepCopyInto(out *ZFSRestore) { *out = *in diff --git a/pkg/builder/nodebuilder/kubernetes.go b/pkg/builder/nodebuilder/kubernetes.go new file mode 100644 index 0000000..ce9b596 --- /dev/null +++ b/pkg/builder/nodebuilder/kubernetes.go @@ -0,0 +1,428 @@ +// Copyright © 2021 The OpenEBS Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package nodebuilder + +import ( + "context" + "encoding/json" + + client "github.com/openebs/lib-csi/pkg/common/kubernetes/client" + apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// getClientsetFn is a typed function that +// abstracts fetching of internal clientset +type getClientsetFn func() (clientset *clientset.Clientset, err error) + +// getClientsetFromPathFn is a typed function that +// abstracts fetching of clientset from kubeConfigPath +type getClientsetForPathFn func(kubeConfigPath string) ( + clientset *clientset.Clientset, + err error, +) + +// createFn is a typed function that abstracts +// creating zfs node instance +type createFn func( + cs *clientset.Clientset, + upgradeResultObj *apis.ZFSNode, + namespace string, +) (*apis.ZFSNode, error) + +// getFn is a typed function that abstracts +// fetching a zfs node instance +type getFn func( + cli *clientset.Clientset, + name, + namespace string, + opts metav1.GetOptions, +) (*apis.ZFSNode, error) + +// listFn is a typed function that abstracts +// listing of zfs node instances +type listFn func( + cli *clientset.Clientset, + namespace string, + opts metav1.ListOptions, +) (*apis.ZFSNodeList, error) + +// delFn is a typed function that abstracts +// deleting a zfs node instance +type delFn func( + cli *clientset.Clientset, + name, + namespace string, + opts *metav1.DeleteOptions, +) error + +// updateFn is a typed function that abstracts +// updating zfs node instance +type updateFn func( + cs *clientset.Clientset, + node *apis.ZFSNode, + namespace string, +) (*apis.ZFSNode, error) + +// Kubeclient enables kubernetes API operations +// on zfs node instance +type Kubeclient struct { + // clientset refers to zfs node's + // clientset that will be responsible to + // make kubernetes API calls + clientset *clientset.Clientset + + kubeConfigPath string + + // namespace holds the namespace on which + // kubeclient has to operate + namespace string + + // functions useful during mocking + getClientset getClientsetFn + getClientsetForPath getClientsetForPathFn + get getFn + list listFn + del delFn + create createFn + update updateFn +} + +// KubeclientBuildOption defines the abstraction +// to build a kubeclient instance +type KubeclientBuildOption func(*Kubeclient) + +// defaultGetClientset is the default implementation to +// get kubernetes clientset instance +func defaultGetClientset() (clients *clientset.Clientset, err error) { + + config, err := client.GetConfig(client.New()) + if err != nil { + return nil, err + } + + return clientset.NewForConfig(config) + +} + +// defaultGetClientsetForPath is the default implementation to +// get kubernetes clientset instance based on the given +// kubeconfig path +func defaultGetClientsetForPath( + kubeConfigPath string, +) (clients *clientset.Clientset, err error) { + config, err := client.GetConfig( + client.New(client.WithKubeConfigPath(kubeConfigPath))) + if err != nil { + return nil, err + } + + return clientset.NewForConfig(config) +} + +// defaultGet is the default implementation to get +// a zfs node instance in kubernetes cluster +func defaultGet( + cli *clientset.Clientset, + name, namespace string, + opts metav1.GetOptions, +) (*apis.ZFSNode, error) { + return cli.ZfsV1(). + ZFSNodes(namespace). + Get(context.TODO(), name, opts) +} + +// defaultList is the default implementation to list +// zfs node instances in kubernetes cluster +func defaultList( + cli *clientset.Clientset, + namespace string, + opts metav1.ListOptions, +) (*apis.ZFSNodeList, error) { + return cli.ZfsV1(). + ZFSNodes(namespace). + List(context.TODO(), opts) +} + +// defaultCreate is the default implementation to delete +// a zfs node instance in kubernetes cluster +func defaultDel( + cli *clientset.Clientset, + name, namespace string, + opts *metav1.DeleteOptions, +) error { + deletePropagation := metav1.DeletePropagationForeground + opts.PropagationPolicy = &deletePropagation + err := cli.ZfsV1(). + ZFSNodes(namespace). + Delete(context.TODO(), name, *opts) + return err +} + +// defaultCreate is the default implementation to create +// a zfs node instance in kubernetes cluster +func defaultCreate( + cli *clientset.Clientset, + node *apis.ZFSNode, + namespace string, +) (*apis.ZFSNode, error) { + return cli.ZfsV1(). + ZFSNodes(namespace). + Create(context.TODO(), node, metav1.CreateOptions{}) +} + +// defaultUpdate is the default implementation to update +// a zfs node instance in kubernetes cluster +func defaultUpdate( + cli *clientset.Clientset, + node *apis.ZFSNode, + namespace string, +) (*apis.ZFSNode, error) { + return cli.ZfsV1(). + ZFSNodes(namespace). + Update(context.TODO(), node, metav1.UpdateOptions{}) +} + +// withDefaults sets the default options +// of kubeclient instance +func (k *Kubeclient) withDefaults() { + if k.getClientset == nil { + k.getClientset = defaultGetClientset + } + if k.getClientsetForPath == nil { + k.getClientsetForPath = defaultGetClientsetForPath + } + if k.get == nil { + k.get = defaultGet + } + if k.list == nil { + k.list = defaultList + } + if k.del == nil { + k.del = defaultDel + } + if k.create == nil { + k.create = defaultCreate + } + if k.update == nil { + k.update = defaultUpdate + } +} + +// WithClientSet sets the kubernetes client against +// the kubeclient instance +func WithClientSet(c *clientset.Clientset) KubeclientBuildOption { + return func(k *Kubeclient) { + k.clientset = c + } +} + +// WithNamespace sets the kubernetes client against +// the provided namespace +func WithNamespace(namespace string) KubeclientBuildOption { + return func(k *Kubeclient) { + k.namespace = namespace + } +} + +// WithNamespace sets the provided namespace +// against this Kubeclient instance +func (k *Kubeclient) WithNamespace(namespace string) *Kubeclient { + k.namespace = namespace + return k +} + +// WithKubeConfigPath sets the kubernetes client +// against the provided path +func WithKubeConfigPath(path string) KubeclientBuildOption { + return func(k *Kubeclient) { + k.kubeConfigPath = path + } +} + +// NewKubeclient returns a new instance of +// kubeclient meant for zfs node operations +func NewKubeclient(opts ...KubeclientBuildOption) *Kubeclient { + k := &Kubeclient{} + for _, o := range opts { + o(k) + } + + k.withDefaults() + return k +} + +func (k *Kubeclient) getClientsetForPathOrDirect() ( + *clientset.Clientset, + error, +) { + if k.kubeConfigPath != "" { + return k.getClientsetForPath(k.kubeConfigPath) + } + + return k.getClientset() +} + +// getClientOrCached returns either a new instance +// of kubernetes client or its cached copy +func (k *Kubeclient) getClientOrCached() (*clientset.Clientset, error) { + if k.clientset != nil { + return k.clientset, nil + } + + c, err := k.getClientsetForPathOrDirect() + if err != nil { + return nil, + errors.Wrapf( + err, + "failed to get clientset", + ) + } + + k.clientset = c + return k.clientset, nil +} + +// Create creates a zfs node instance +// in kubernetes cluster +func (k *Kubeclient) Create(node *apis.ZFSNode) (*apis.ZFSNode, error) { + if node == nil { + return nil, + errors.New( + "failed to create zfs node: nil node object", + ) + } + cs, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to create zfs node {%s} in namespace {%s}", + node.Name, + k.namespace, + ) + } + + return k.create(cs, node, k.namespace) +} + +// Get returns zfs node object for given name +func (k *Kubeclient) Get( + name string, + opts metav1.GetOptions, +) (*apis.ZFSNode, error) { + if name == "" { + return nil, + errors.New( + "failed to get zfs node: missing zfs node name", + ) + } + + cli, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to get zfs node {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return k.get(cli, name, k.namespace, opts) +} + +// GetRaw returns zfs node instance +// in bytes +func (k *Kubeclient) GetRaw( + name string, + opts metav1.GetOptions, +) ([]byte, error) { + if name == "" { + return nil, errors.New( + "failed to get raw zfs node: missing node name", + ) + } + csiv, err := k.Get(name, opts) + if err != nil { + return nil, errors.Wrapf( + err, + "failed to get zfs node {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return json.Marshal(csiv) +} + +// List returns a list of zfs node +// instances present in kubernetes cluster +func (k *Kubeclient) List(opts metav1.ListOptions) (*apis.ZFSNodeList, error) { + cli, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to list zfs nodes in namespace {%s}", + k.namespace, + ) + } + + return k.list(cli, k.namespace, opts) +} + +// Delete deletes the zfs node from +// kubernetes +func (k *Kubeclient) Delete(name string) error { + if name == "" { + return errors.New( + "failed to delete zfsnode: missing node name", + ) + } + cli, err := k.getClientOrCached() + if err != nil { + return errors.Wrapf( + err, + "failed to delete zfsnode {%s} in namespace {%s}", + name, + k.namespace, + ) + } + + return k.del(cli, name, k.namespace, &metav1.DeleteOptions{}) +} + +// Update updates this zfs node instance +// against kubernetes cluster +func (k *Kubeclient) Update(node *apis.ZFSNode) (*apis.ZFSNode, error) { + if node == nil { + return nil, + errors.New( + "failed to update zfsnode: nil node object", + ) + } + + cs, err := k.getClientOrCached() + if err != nil { + return nil, errors.Wrapf( + err, + "failed to update zfsnode {%s} in namespace {%s}", + node.Name, + node.Namespace, + ) + } + + return k.update(cs, node, k.namespace) +} diff --git a/pkg/builder/nodebuilder/node.go b/pkg/builder/nodebuilder/node.go new file mode 100644 index 0000000..3d3a359 --- /dev/null +++ b/pkg/builder/nodebuilder/node.go @@ -0,0 +1,122 @@ +/* +Copyright 2021 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package nodebuilder + +import ( + apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// Builder is the builder object for ZFSNode +type Builder struct { + node *ZFSNode + errs []error +} + +// ZFSNode is a wrapper over +// ZFSNode API instance +type ZFSNode struct { + // ZFSVolume object + Object *apis.ZFSNode +} + +// From returns a new instance of +// zfs volume +func From(node *apis.ZFSNode) *ZFSNode { + return &ZFSNode{ + Object: node, + } +} + +// NewBuilder returns new instance of Builder +func NewBuilder() *Builder { + return &Builder{ + node: &ZFSNode{ + Object: &apis.ZFSNode{}, + }, + } +} + +// BuildFrom returns new instance of Builder +// from the provided api instance +func BuildFrom(node *apis.ZFSNode) *Builder { + if node == nil { + b := NewBuilder() + b.errs = append( + b.errs, + errors.New("failed to build zfs node object: nil node"), + ) + return b + } + return &Builder{ + node: &ZFSNode{ + Object: node, + }, + } +} + +// WithNamespace sets the namespace of ZFSNode +func (b *Builder) WithNamespace(namespace string) *Builder { + if namespace == "" { + b.errs = append( + b.errs, + errors.New( + "failed to build zfs node object: missing namespace", + ), + ) + return b + } + b.node.Object.Namespace = namespace + return b +} + +// WithName sets the name of ZFSNode +func (b *Builder) WithName(name string) *Builder { + if name == "" { + b.errs = append( + b.errs, + errors.New( + "failed to build zfs node object: missing name", + ), + ) + return b + } + b.node.Object.Name = name + return b +} + +// WithPools sets the pools of ZFSNode +func (b *Builder) WithPools(pools []apis.Pool) *Builder { + b.node.Object.Pools = pools + return b +} + +// WithOwnerReferences sets the owner references of ZFSNode +func (b *Builder) WithOwnerReferences(ownerRefs ...metav1.OwnerReference) *Builder { + b.node.Object.OwnerReferences = ownerRefs + return b +} + +// Build returns ZFSNode API object +func (b *Builder) Build() (*apis.ZFSNode, error) { + if len(b.errs) > 0 { + return nil, errors.Errorf("%+v", b.errs) + } + + return b.node.Object, nil +} diff --git a/pkg/driver/agent.go b/pkg/driver/agent.go index 2c57399..e27622b 100644 --- a/pkg/driver/agent.go +++ b/pkg/driver/agent.go @@ -30,6 +30,7 @@ import ( "github.com/openebs/zfs-localpv/pkg/mgmt/restore" "github.com/openebs/zfs-localpv/pkg/mgmt/snapshot" "github.com/openebs/zfs-localpv/pkg/mgmt/volume" + "github.com/openebs/zfs-localpv/pkg/mgmt/zfsnode" "github.com/openebs/zfs-localpv/pkg/zfs" "golang.org/x/net/context" "golang.org/x/sys/unix" @@ -54,6 +55,14 @@ func NewNode(d *CSIDriver) csi.NodeServer { // set up signals so we handle the first shutdown signal gracefully stopCh := signals.SetupSignalHandler() + // start the zfsnode resource watcher + go func() { + err := zfsnode.Start(&ControllerMutex, stopCh) + if err != nil { + klog.Fatalf("Failed to start ZFS node controller: %s", err.Error()) + } + }() + // start the zfsvolume watcher go func() { err := volume.Start(&ControllerMutex, stopCh) diff --git a/pkg/driver/controller.go b/pkg/driver/controller.go index a635e10..cdf0899 100644 --- a/pkg/driver/controller.go +++ b/pkg/driver/controller.go @@ -22,11 +22,22 @@ import ( "strings" "time" + k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" + zfsapi "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset" + informers "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions" + kubeinformers "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" + "github.com/container-storage-interface/spec/lib/go/csi" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" k8serror "k8s.io/apimachinery/pkg/api/errors" + apimeta "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" "k8s.io/klog" errors "github.com/openebs/lib-csi/pkg/common/errors" @@ -52,15 +63,69 @@ const ( type controller struct { driver *CSIDriver capabilities []*csi.ControllerServiceCapability + + indexedLabel string + + k8sNodeInformer cache.SharedIndexInformer + zfsNodeInformer cache.SharedIndexInformer } // NewController returns a new instance // of CSI controller func NewController(d *CSIDriver) csi.ControllerServer { - return &controller{ + ctrl := &controller{ driver: d, capabilities: newControllerCapabilities(), } + if err := ctrl.init(); err != nil { + klog.Fatalf("init controller: %v", err) + } + + return ctrl +} + +func (cs *controller) init() error { + cfg, err := k8sapi.Config().Get() + if err != nil { + return errors.Wrapf(err, "failed to build kubeconfig") + } + + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "failed to build k8s clientset") + } + + openebsClient, err := clientset.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "failed to build openebs clientset") + } + + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, 0) + openebsInformerfactory := informers.NewSharedInformerFactoryWithOptions(openebsClient, + 0, informers.WithNamespace(zfs.OpenEBSNamespace)) + + // set up signals so we handle the first shutdown signal gracefully + stopCh := signals.SetupSignalHandler() + + cs.k8sNodeInformer = kubeInformerFactory.Core().V1().Nodes().Informer() + cs.zfsNodeInformer = openebsInformerfactory.Zfs().V1().ZFSNodes().Informer() + + if err = cs.zfsNodeInformer.AddIndexers(map[string]cache.IndexFunc{ + LabelIndexName(cs.indexedLabel): LabelIndexFunc(cs.indexedLabel), + }); err != nil { + return errors.Wrapf(err, "failed to add index on label %v", cs.indexedLabel) + } + + go cs.k8sNodeInformer.Run(stopCh) + go cs.zfsNodeInformer.Run(stopCh) + + // wait for all the caches to be populated. + klog.Info("waiting for k8s & zfs node informer caches to be synced") + cache.WaitForCacheSync(stopCh, + cs.k8sNodeInformer.HasSynced, + cs.zfsNodeInformer.HasSynced) + klog.Info("synced k8s & zfs node informer caches") + return nil } // SupportedVolumeCapabilityAccessModes contains the list of supported access @@ -784,7 +849,7 @@ func (cs *controller) ControllerPublishVolume( } // GetCapacity return the capacity of the -// given volume +// given node topology segment. // // This implements csi.ControllerServer func (cs *controller) GetCapacity( @@ -792,7 +857,89 @@ func (cs *controller) GetCapacity( req *csi.GetCapacityRequest, ) (*csi.GetCapacityResponse, error) { - return nil, status.Error(codes.Unimplemented, "") + var segments map[string]string + if topology := req.GetAccessibleTopology(); topology != nil { + segments = topology.Segments + } + nodeNames, err := cs.filterNodesByTopology(segments) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + + zfsNodesCache := cs.zfsNodeInformer.GetIndexer() + params := req.GetParameters() + poolParam := helpers.GetInsensitiveParameter(¶ms, "poolname") + + var availableCapacity int64 + for _, nodeName := range nodeNames { + v, exists, err := zfsNodesCache.GetByKey(zfs.OpenEBSNamespace + "/" + nodeName) + if err != nil { + klog.Warning("unexpected error after querying the zfsNode informer cache") + continue + } + if !exists { + continue + } + zfsNode := v.(*zfsapi.ZFSNode) + // rather than summing all free capacity, we are calculating maximum + // zv size that gets fit in given pool. + // See https://github.com/kubernetes/enhancements/tree/master/keps/sig-storage/1472-storage-capacity-tracking#available-capacity-vs-maximum-volume-size & + // https://github.com/container-storage-interface/spec/issues/432 for more details + for _, zpool := range zfsNode.Pools { + if zpool.Name != poolParam { + continue + } + freeCapacity := zpool.Free.Value() + if availableCapacity < freeCapacity { + availableCapacity = freeCapacity + } + } + } + + return &csi.GetCapacityResponse{ + AvailableCapacity: availableCapacity, + }, nil +} + +func (cs *controller) filterNodesByTopology(segments map[string]string) ([]string, error) { + nodesCache := cs.k8sNodeInformer.GetIndexer() + if len(segments) == 0 { + return nodesCache.ListKeys(), nil + } + + filterNodes := func(vs []interface{}) ([]string, error) { + var names []string + selector := labels.SelectorFromSet(segments) + for _, v := range vs { + meta, err := apimeta.Accessor(v) + if err != nil { + return nil, err + } + if selector.Matches(labels.Set(meta.GetLabels())) { + names = append(names, meta.GetName()) + } + } + return names, nil + } + + // first see if we need to filter the informer cache by indexed label, + // so that we don't need to iterate over all the nodes for performance + // reasons in large cluster. + indexName := LabelIndexName(cs.indexedLabel) + if _, ok := nodesCache.GetIndexers()[indexName]; !ok { + // run through all the nodes in case indexer doesn't exists. + return filterNodes(nodesCache.List()) + } + + if segValue, ok := segments[cs.indexedLabel]; ok { + vs, err := nodesCache.ByIndex(indexName, segValue) + if err != nil { + return nil, errors.Wrapf(err, "query indexed store indexName=%v indexKey=%v", + indexName, segValue) + } + return filterNodes(vs) + } + return filterNodes(nodesCache.List()) } // ListVolumes lists all the volumes @@ -862,6 +1009,7 @@ func newControllerCapabilities() []*csi.ControllerServiceCapability { csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT, csi.ControllerServiceCapability_RPC_CLONE_VOLUME, csi.ControllerServiceCapability_RPC_EXPAND_VOLUME, + csi.ControllerServiceCapability_RPC_GET_CAPACITY, } { capabilities = append(capabilities, fromType(cap)) } @@ -914,3 +1062,24 @@ func (cs *controller) validateVolumeCreateReq(req *csi.CreateVolumeRequest) erro } return nil } + +// LabelIndexName add prefix for label index. +func LabelIndexName(label string) string { + return "l:" + label +} + +// LabelIndexFunc defines index values for given label. +func LabelIndexFunc(label string) cache.IndexFunc { + return func(obj interface{}) ([]string, error) { + meta, err := apimeta.Accessor(obj) + if err != nil { + return nil, fmt.Errorf( + "k8s api object type (%T) doesn't implements metav1.Object interface: %v", obj, err) + } + var vs []string + if v, ok := meta.GetLabels()[label]; ok { + vs = append(vs, v) + } + return vs, nil + } +} diff --git a/pkg/equality/semantic.go b/pkg/equality/semantic.go new file mode 100644 index 0000000..3da98f2 --- /dev/null +++ b/pkg/equality/semantic.go @@ -0,0 +1,55 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package equality + +import ( + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/conversion" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" +) + +// Since we are using quite older version of k8s.io/* libraries, we need take up +// their upgrades as part of separate pull request. +// +// Copied from https://github.com/kubernetes/apimachinery/blob/c93b0f84892eb6bcbc80b312ae70729d8168bc7e/pkg/api/equality/semantic.go +// More details at - https://github.com/kubernetes/apimachinery/issues/75 + +// Semantic can do semantic deep equality checks for api objects. +// Example: apiequality.Semantic.DeepEqual(aPod, aPodWithNonNilButEmptyMaps) == true +var Semantic = conversion.EqualitiesOrDie( + func(a, b resource.Quantity) bool { + // Ignore formatting, only care that numeric value stayed the same. + // TODO: if we decide it's important, it should be safe to start comparing the format. + // + // Uninitialized quantities are equivalent to 0 quantities. + return a.Cmp(b) == 0 + }, + func(a, b metav1.MicroTime) bool { + return a.UTC() == b.UTC() + }, + func(a, b metav1.Time) bool { + return a.UTC() == b.UTC() + }, + func(a, b labels.Selector) bool { + return a.String() == b.String() + }, + func(a, b fields.Selector) bool { + return a.String() == b.String() + }, +) diff --git a/pkg/generated/clientset/internalclientset/typed/zfs/v1/fake/fake_zfs_client.go b/pkg/generated/clientset/internalclientset/typed/zfs/v1/fake/fake_zfs_client.go index 21cb401..5e83463 100644 --- a/pkg/generated/clientset/internalclientset/typed/zfs/v1/fake/fake_zfs_client.go +++ b/pkg/generated/clientset/internalclientset/typed/zfs/v1/fake/fake_zfs_client.go @@ -32,6 +32,10 @@ func (c *FakeZfsV1) ZFSBackups(namespace string) v1.ZFSBackupInterface { return &FakeZFSBackups{c, namespace} } +func (c *FakeZfsV1) ZFSNodes(namespace string) v1.ZFSNodeInterface { + return &FakeZFSNodes{c, namespace} +} + func (c *FakeZfsV1) ZFSRestores(namespace string) v1.ZFSRestoreInterface { return &FakeZFSRestores{c, namespace} } diff --git a/pkg/generated/clientset/internalclientset/typed/zfs/v1/fake/fake_zfsnode.go b/pkg/generated/clientset/internalclientset/typed/zfs/v1/fake/fake_zfsnode.go new file mode 100644 index 0000000..202b2fd --- /dev/null +++ b/pkg/generated/clientset/internalclientset/typed/zfs/v1/fake/fake_zfsnode.go @@ -0,0 +1,130 @@ +/* +Copyright 2019 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package fake + +import ( + "context" + + zfsv1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + labels "k8s.io/apimachinery/pkg/labels" + schema "k8s.io/apimachinery/pkg/runtime/schema" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + testing "k8s.io/client-go/testing" +) + +// FakeZFSNodes implements ZFSNodeInterface +type FakeZFSNodes struct { + Fake *FakeZfsV1 + ns string +} + +var zfsnodesResource = schema.GroupVersionResource{Group: "zfs.openebs.io", Version: "v1", Resource: "zfsnodes"} + +var zfsnodesKind = schema.GroupVersionKind{Group: "zfs.openebs.io", Version: "v1", Kind: "ZFSNode"} + +// Get takes name of the zFSNode, and returns the corresponding zFSNode object, and an error if there is any. +func (c *FakeZFSNodes) Get(ctx context.Context, name string, options v1.GetOptions) (result *zfsv1.ZFSNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewGetAction(zfsnodesResource, c.ns, name), &zfsv1.ZFSNode{}) + + if obj == nil { + return nil, err + } + return obj.(*zfsv1.ZFSNode), err +} + +// List takes label and field selectors, and returns the list of ZFSNodes that match those selectors. +func (c *FakeZFSNodes) List(ctx context.Context, opts v1.ListOptions) (result *zfsv1.ZFSNodeList, err error) { + obj, err := c.Fake. + Invokes(testing.NewListAction(zfsnodesResource, zfsnodesKind, c.ns, opts), &zfsv1.ZFSNodeList{}) + + if obj == nil { + return nil, err + } + + label, _, _ := testing.ExtractFromListOptions(opts) + if label == nil { + label = labels.Everything() + } + list := &zfsv1.ZFSNodeList{ListMeta: obj.(*zfsv1.ZFSNodeList).ListMeta} + for _, item := range obj.(*zfsv1.ZFSNodeList).Items { + if label.Matches(labels.Set(item.Labels)) { + list.Items = append(list.Items, item) + } + } + return list, err +} + +// Watch returns a watch.Interface that watches the requested zFSNodes. +func (c *FakeZFSNodes) Watch(ctx context.Context, opts v1.ListOptions) (watch.Interface, error) { + return c.Fake. + InvokesWatch(testing.NewWatchAction(zfsnodesResource, c.ns, opts)) + +} + +// Create takes the representation of a zFSNode and creates it. Returns the server's representation of the zFSNode, and an error, if there is any. +func (c *FakeZFSNodes) Create(ctx context.Context, zFSNode *zfsv1.ZFSNode, opts v1.CreateOptions) (result *zfsv1.ZFSNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewCreateAction(zfsnodesResource, c.ns, zFSNode), &zfsv1.ZFSNode{}) + + if obj == nil { + return nil, err + } + return obj.(*zfsv1.ZFSNode), err +} + +// Update takes the representation of a zFSNode and updates it. Returns the server's representation of the zFSNode, and an error, if there is any. +func (c *FakeZFSNodes) Update(ctx context.Context, zFSNode *zfsv1.ZFSNode, opts v1.UpdateOptions) (result *zfsv1.ZFSNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateAction(zfsnodesResource, c.ns, zFSNode), &zfsv1.ZFSNode{}) + + if obj == nil { + return nil, err + } + return obj.(*zfsv1.ZFSNode), err +} + +// Delete takes name of the zFSNode and deletes it. Returns an error if one occurs. +func (c *FakeZFSNodes) Delete(ctx context.Context, name string, opts v1.DeleteOptions) error { + _, err := c.Fake. + Invokes(testing.NewDeleteAction(zfsnodesResource, c.ns, name), &zfsv1.ZFSNode{}) + + return err +} + +// DeleteCollection deletes a collection of objects. +func (c *FakeZFSNodes) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error { + action := testing.NewDeleteCollectionAction(zfsnodesResource, c.ns, listOpts) + + _, err := c.Fake.Invokes(action, &zfsv1.ZFSNodeList{}) + return err +} + +// Patch applies the patch and returns the patched zFSNode. +func (c *FakeZFSNodes) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts v1.PatchOptions, subresources ...string) (result *zfsv1.ZFSNode, err error) { + obj, err := c.Fake. + Invokes(testing.NewPatchSubresourceAction(zfsnodesResource, c.ns, name, pt, data, subresources...), &zfsv1.ZFSNode{}) + + if obj == nil { + return nil, err + } + return obj.(*zfsv1.ZFSNode), err +} diff --git a/pkg/generated/clientset/internalclientset/typed/zfs/v1/generated_expansion.go b/pkg/generated/clientset/internalclientset/typed/zfs/v1/generated_expansion.go index bdfa5a5..eb01424 100644 --- a/pkg/generated/clientset/internalclientset/typed/zfs/v1/generated_expansion.go +++ b/pkg/generated/clientset/internalclientset/typed/zfs/v1/generated_expansion.go @@ -20,6 +20,8 @@ package v1 type ZFSBackupExpansion interface{} +type ZFSNodeExpansion interface{} + type ZFSRestoreExpansion interface{} type ZFSSnapshotExpansion interface{} diff --git a/pkg/generated/clientset/internalclientset/typed/zfs/v1/zfs_client.go b/pkg/generated/clientset/internalclientset/typed/zfs/v1/zfs_client.go index e2f4daf..fc01176 100644 --- a/pkg/generated/clientset/internalclientset/typed/zfs/v1/zfs_client.go +++ b/pkg/generated/clientset/internalclientset/typed/zfs/v1/zfs_client.go @@ -27,6 +27,7 @@ import ( type ZfsV1Interface interface { RESTClient() rest.Interface ZFSBackupsGetter + ZFSNodesGetter ZFSRestoresGetter ZFSSnapshotsGetter ZFSVolumesGetter @@ -41,6 +42,10 @@ func (c *ZfsV1Client) ZFSBackups(namespace string) ZFSBackupInterface { return newZFSBackups(c, namespace) } +func (c *ZfsV1Client) ZFSNodes(namespace string) ZFSNodeInterface { + return newZFSNodes(c, namespace) +} + func (c *ZfsV1Client) ZFSRestores(namespace string) ZFSRestoreInterface { return newZFSRestores(c, namespace) } diff --git a/pkg/generated/clientset/internalclientset/typed/zfs/v1/zfsnode.go b/pkg/generated/clientset/internalclientset/typed/zfs/v1/zfsnode.go new file mode 100644 index 0000000..58754e2 --- /dev/null +++ b/pkg/generated/clientset/internalclientset/typed/zfs/v1/zfsnode.go @@ -0,0 +1,178 @@ +/* +Copyright 2019 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by client-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + "time" + + v1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + scheme "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset/scheme" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + types "k8s.io/apimachinery/pkg/types" + watch "k8s.io/apimachinery/pkg/watch" + rest "k8s.io/client-go/rest" +) + +// ZFSNodesGetter has a method to return a ZFSNodeInterface. +// A group's client should implement this interface. +type ZFSNodesGetter interface { + ZFSNodes(namespace string) ZFSNodeInterface +} + +// ZFSNodeInterface has methods to work with ZFSNode resources. +type ZFSNodeInterface interface { + Create(ctx context.Context, zFSNode *v1.ZFSNode, opts metav1.CreateOptions) (*v1.ZFSNode, error) + Update(ctx context.Context, zFSNode *v1.ZFSNode, opts metav1.UpdateOptions) (*v1.ZFSNode, error) + Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error + DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error + Get(ctx context.Context, name string, opts metav1.GetOptions) (*v1.ZFSNode, error) + List(ctx context.Context, opts metav1.ListOptions) (*v1.ZFSNodeList, error) + Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.ZFSNode, err error) + ZFSNodeExpansion +} + +// zFSNodes implements ZFSNodeInterface +type zFSNodes struct { + client rest.Interface + ns string +} + +// newZFSNodes returns a ZFSNodes +func newZFSNodes(c *ZfsV1Client, namespace string) *zFSNodes { + return &zFSNodes{ + client: c.RESTClient(), + ns: namespace, + } +} + +// Get takes name of the zFSNode, and returns the corresponding zFSNode object, and an error if there is any. +func (c *zFSNodes) Get(ctx context.Context, name string, options metav1.GetOptions) (result *v1.ZFSNode, err error) { + result = &v1.ZFSNode{} + err = c.client.Get(). + Namespace(c.ns). + Resource("zfsnodes"). + Name(name). + VersionedParams(&options, scheme.ParameterCodec). + Do(ctx). + Into(result) + return +} + +// List takes label and field selectors, and returns the list of ZFSNodes that match those selectors. +func (c *zFSNodes) List(ctx context.Context, opts metav1.ListOptions) (result *v1.ZFSNodeList, err error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + result = &v1.ZFSNodeList{} + err = c.client.Get(). + Namespace(c.ns). + Resource("zfsnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Do(ctx). + Into(result) + return +} + +// Watch returns a watch.Interface that watches the requested zFSNodes. +func (c *zFSNodes) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { + var timeout time.Duration + if opts.TimeoutSeconds != nil { + timeout = time.Duration(*opts.TimeoutSeconds) * time.Second + } + opts.Watch = true + return c.client.Get(). + Namespace(c.ns). + Resource("zfsnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Timeout(timeout). + Watch(ctx) +} + +// Create takes the representation of a zFSNode and creates it. Returns the server's representation of the zFSNode, and an error, if there is any. +func (c *zFSNodes) Create(ctx context.Context, zFSNode *v1.ZFSNode, opts metav1.CreateOptions) (result *v1.ZFSNode, err error) { + result = &v1.ZFSNode{} + err = c.client.Post(). + Namespace(c.ns). + Resource("zfsnodes"). + VersionedParams(&opts, scheme.ParameterCodec). + Body(zFSNode). + Do(ctx). + Into(result) + return +} + +// Update takes the representation of a zFSNode and updates it. Returns the server's representation of the zFSNode, and an error, if there is any. +func (c *zFSNodes) Update(ctx context.Context, zFSNode *v1.ZFSNode, opts metav1.UpdateOptions) (result *v1.ZFSNode, err error) { + result = &v1.ZFSNode{} + err = c.client.Put(). + Namespace(c.ns). + Resource("zfsnodes"). + Name(zFSNode.Name). + VersionedParams(&opts, scheme.ParameterCodec). + Body(zFSNode). + Do(ctx). + Into(result) + return +} + +// Delete takes name of the zFSNode and deletes it. Returns an error if one occurs. +func (c *zFSNodes) Delete(ctx context.Context, name string, opts metav1.DeleteOptions) error { + return c.client.Delete(). + Namespace(c.ns). + Resource("zfsnodes"). + Name(name). + Body(&opts). + Do(ctx). + Error() +} + +// DeleteCollection deletes a collection of objects. +func (c *zFSNodes) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOpts metav1.ListOptions) error { + var timeout time.Duration + if listOpts.TimeoutSeconds != nil { + timeout = time.Duration(*listOpts.TimeoutSeconds) * time.Second + } + return c.client.Delete(). + Namespace(c.ns). + Resource("zfsnodes"). + VersionedParams(&listOpts, scheme.ParameterCodec). + Timeout(timeout). + Body(&opts). + Do(ctx). + Error() +} + +// Patch applies the patch and returns the patched zFSNode. +func (c *zFSNodes) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (result *v1.ZFSNode, err error) { + result = &v1.ZFSNode{} + err = c.client.Patch(pt). + Namespace(c.ns). + Resource("zfsnodes"). + Name(name). + SubResource(subresources...). + VersionedParams(&opts, scheme.ParameterCodec). + Body(data). + Do(ctx). + Into(result) + return +} diff --git a/pkg/generated/informer/externalversions/generic.go b/pkg/generated/informer/externalversions/generic.go index 5798a31..0038f00 100644 --- a/pkg/generated/informer/externalversions/generic.go +++ b/pkg/generated/informer/externalversions/generic.go @@ -55,6 +55,8 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource // Group=zfs.openebs.io, Version=v1 case v1.SchemeGroupVersion.WithResource("zfsbackups"): return &genericInformer{resource: resource.GroupResource(), informer: f.Zfs().V1().ZFSBackups().Informer()}, nil + case v1.SchemeGroupVersion.WithResource("zfsnodes"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Zfs().V1().ZFSNodes().Informer()}, nil case v1.SchemeGroupVersion.WithResource("zfsrestores"): return &genericInformer{resource: resource.GroupResource(), informer: f.Zfs().V1().ZFSRestores().Informer()}, nil case v1.SchemeGroupVersion.WithResource("zfssnapshots"): diff --git a/pkg/generated/informer/externalversions/zfs/v1/interface.go b/pkg/generated/informer/externalversions/zfs/v1/interface.go index e466bae..8bf6fb6 100644 --- a/pkg/generated/informer/externalversions/zfs/v1/interface.go +++ b/pkg/generated/informer/externalversions/zfs/v1/interface.go @@ -26,6 +26,8 @@ import ( type Interface interface { // ZFSBackups returns a ZFSBackupInformer. ZFSBackups() ZFSBackupInformer + // ZFSNodes returns a ZFSNodeInformer. + ZFSNodes() ZFSNodeInformer // ZFSRestores returns a ZFSRestoreInformer. ZFSRestores() ZFSRestoreInformer // ZFSSnapshots returns a ZFSSnapshotInformer. @@ -50,6 +52,11 @@ func (v *version) ZFSBackups() ZFSBackupInformer { return &zFSBackupInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} } +// ZFSNodes returns a ZFSNodeInformer. +func (v *version) ZFSNodes() ZFSNodeInformer { + return &zFSNodeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} +} + // ZFSRestores returns a ZFSRestoreInformer. func (v *version) ZFSRestores() ZFSRestoreInformer { return &zFSRestoreInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions} diff --git a/pkg/generated/informer/externalversions/zfs/v1/zfsnode.go b/pkg/generated/informer/externalversions/zfs/v1/zfsnode.go new file mode 100644 index 0000000..0c7a009 --- /dev/null +++ b/pkg/generated/informer/externalversions/zfs/v1/zfsnode.go @@ -0,0 +1,90 @@ +/* +Copyright 2019 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by informer-gen. DO NOT EDIT. + +package v1 + +import ( + "context" + time "time" + + zfsv1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + internalclientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset" + internalinterfaces "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions/internalinterfaces" + v1 "github.com/openebs/zfs-localpv/pkg/generated/lister/zfs/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + runtime "k8s.io/apimachinery/pkg/runtime" + watch "k8s.io/apimachinery/pkg/watch" + cache "k8s.io/client-go/tools/cache" +) + +// ZFSNodeInformer provides access to a shared informer and lister for +// ZFSNodes. +type ZFSNodeInformer interface { + Informer() cache.SharedIndexInformer + Lister() v1.ZFSNodeLister +} + +type zFSNodeInformer struct { + factory internalinterfaces.SharedInformerFactory + tweakListOptions internalinterfaces.TweakListOptionsFunc + namespace string +} + +// NewZFSNodeInformer constructs a new informer for ZFSNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewZFSNodeInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { + return NewFilteredZFSNodeInformer(client, namespace, resyncPeriod, indexers, nil) +} + +// NewFilteredZFSNodeInformer constructs a new informer for ZFSNode type. +// Always prefer using an informer factory to get a shared informer instead of getting an independent +// one. This reduces memory footprint and number of connections to the server. +func NewFilteredZFSNodeInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer { + return cache.NewSharedIndexInformer( + &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ZfsV1().ZFSNodes(namespace).List(context.TODO(), options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + if tweakListOptions != nil { + tweakListOptions(&options) + } + return client.ZfsV1().ZFSNodes(namespace).Watch(context.TODO(), options) + }, + }, + &zfsv1.ZFSNode{}, + resyncPeriod, + indexers, + ) +} + +func (f *zFSNodeInformer) defaultInformer(client internalclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { + return NewFilteredZFSNodeInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions) +} + +func (f *zFSNodeInformer) Informer() cache.SharedIndexInformer { + return f.factory.InformerFor(&zfsv1.ZFSNode{}, f.defaultInformer) +} + +func (f *zFSNodeInformer) Lister() v1.ZFSNodeLister { + return v1.NewZFSNodeLister(f.Informer().GetIndexer()) +} diff --git a/pkg/generated/lister/zfs/v1/expansion_generated.go b/pkg/generated/lister/zfs/v1/expansion_generated.go index 4842b2a..2e63608 100644 --- a/pkg/generated/lister/zfs/v1/expansion_generated.go +++ b/pkg/generated/lister/zfs/v1/expansion_generated.go @@ -26,6 +26,14 @@ type ZFSBackupListerExpansion interface{} // ZFSBackupNamespaceLister. type ZFSBackupNamespaceListerExpansion interface{} +// ZFSNodeListerExpansion allows custom methods to be added to +// ZFSNodeLister. +type ZFSNodeListerExpansion interface{} + +// ZFSNodeNamespaceListerExpansion allows custom methods to be added to +// ZFSNodeNamespaceLister. +type ZFSNodeNamespaceListerExpansion interface{} + // ZFSRestoreListerExpansion allows custom methods to be added to // ZFSRestoreLister. type ZFSRestoreListerExpansion interface{} diff --git a/pkg/generated/lister/zfs/v1/zfsnode.go b/pkg/generated/lister/zfs/v1/zfsnode.go new file mode 100644 index 0000000..75b76d9 --- /dev/null +++ b/pkg/generated/lister/zfs/v1/zfsnode.go @@ -0,0 +1,99 @@ +/* +Copyright 2019 The OpenEBS Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by lister-gen. DO NOT EDIT. + +package v1 + +import ( + v1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/tools/cache" +) + +// ZFSNodeLister helps list ZFSNodes. +// All objects returned here must be treated as read-only. +type ZFSNodeLister interface { + // List lists all ZFSNodes in the indexer. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1.ZFSNode, err error) + // ZFSNodes returns an object that can list and get ZFSNodes. + ZFSNodes(namespace string) ZFSNodeNamespaceLister + ZFSNodeListerExpansion +} + +// zFSNodeLister implements the ZFSNodeLister interface. +type zFSNodeLister struct { + indexer cache.Indexer +} + +// NewZFSNodeLister returns a new ZFSNodeLister. +func NewZFSNodeLister(indexer cache.Indexer) ZFSNodeLister { + return &zFSNodeLister{indexer: indexer} +} + +// List lists all ZFSNodes in the indexer. +func (s *zFSNodeLister) List(selector labels.Selector) (ret []*v1.ZFSNode, err error) { + err = cache.ListAll(s.indexer, selector, func(m interface{}) { + ret = append(ret, m.(*v1.ZFSNode)) + }) + return ret, err +} + +// ZFSNodes returns an object that can list and get ZFSNodes. +func (s *zFSNodeLister) ZFSNodes(namespace string) ZFSNodeNamespaceLister { + return zFSNodeNamespaceLister{indexer: s.indexer, namespace: namespace} +} + +// ZFSNodeNamespaceLister helps list and get ZFSNodes. +// All objects returned here must be treated as read-only. +type ZFSNodeNamespaceLister interface { + // List lists all ZFSNodes in the indexer for a given namespace. + // Objects returned here must be treated as read-only. + List(selector labels.Selector) (ret []*v1.ZFSNode, err error) + // Get retrieves the ZFSNode from the indexer for a given namespace and name. + // Objects returned here must be treated as read-only. + Get(name string) (*v1.ZFSNode, error) + ZFSNodeNamespaceListerExpansion +} + +// zFSNodeNamespaceLister implements the ZFSNodeNamespaceLister +// interface. +type zFSNodeNamespaceLister struct { + indexer cache.Indexer + namespace string +} + +// List lists all ZFSNodes in the indexer for a given namespace. +func (s zFSNodeNamespaceLister) List(selector labels.Selector) (ret []*v1.ZFSNode, err error) { + err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { + ret = append(ret, m.(*v1.ZFSNode)) + }) + return ret, err +} + +// Get retrieves the ZFSNode from the indexer for a given namespace and name. +func (s zFSNodeNamespaceLister) Get(name string) (*v1.ZFSNode, error) { + obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) + if err != nil { + return nil, err + } + if !exists { + return nil, errors.NewNotFound(v1.Resource("zfsnode"), name) + } + return obj.(*v1.ZFSNode), nil +} diff --git a/pkg/mgmt/zfsnode/builder.go b/pkg/mgmt/zfsnode/builder.go new file mode 100644 index 0000000..14ea0ed --- /dev/null +++ b/pkg/mgmt/zfsnode/builder.go @@ -0,0 +1,157 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package zfsnode + +import ( + "time" + + clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset" + openebsScheme "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset/scheme" + informers "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions" + listers "github.com/openebs/zfs-localpv/pkg/generated/lister/zfs/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes/scheme" + typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/record" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog" +) + +const controllerAgentName = "zfsnode-controller" + +// NodeController is the controller implementation for zfs node resources +type NodeController struct { + // kubeclientset is a standard kubernetes clientset + kubeclientset kubernetes.Interface + + // clientset is a openebs custom resource package generated for custom API group. + clientset clientset.Interface + + NodeLister listers.ZFSNodeLister + + // NodeSynced is used for caches sync to get populated + NodeSynced cache.InformerSynced + + // workqueue is a rate limited work queue. This is used to queue work to be + // processed instead of performing it as soon as a change happens. This + // means we can ensure we only process a fixed amount of resources at a + // time, and makes it easy to ensure we are never processing the same item + // simultaneously in two different workers. + workqueue workqueue.RateLimitingInterface + + // recorder is an event recorder for recording Event resources to the + // Kubernetes API. + recorder record.EventRecorder + + // pollInterval controls the polling frequency of syncing up the vg metadata. + pollInterval time.Duration + + // ownerRef is used to set the owner reference to zfsnode objects. + ownerRef metav1.OwnerReference +} + +// NodeControllerBuilder is the builder object for controller. +type NodeControllerBuilder struct { + NodeController *NodeController +} + +// NewNodeControllerBuilder returns an empty instance of controller builder. +func NewNodeControllerBuilder() *NodeControllerBuilder { + return &NodeControllerBuilder{ + NodeController: &NodeController{}, + } +} + +// withKubeClient fills kube client to controller object. +func (cb *NodeControllerBuilder) withKubeClient(ks kubernetes.Interface) *NodeControllerBuilder { + cb.NodeController.kubeclientset = ks + return cb +} + +// withOpenEBSClient fills openebs client to controller object. +func (cb *NodeControllerBuilder) withOpenEBSClient(cs clientset.Interface) *NodeControllerBuilder { + cb.NodeController.clientset = cs + return cb +} + +// withNodeLister fills Node lister to controller object. +func (cb *NodeControllerBuilder) withNodeLister(sl informers.SharedInformerFactory) *NodeControllerBuilder { + NodeInformer := sl.Zfs().V1().ZFSNodes() + cb.NodeController.NodeLister = NodeInformer.Lister() + return cb +} + +// withNodeSynced adds object sync information in cache to controller object. +func (cb *NodeControllerBuilder) withNodeSynced(sl informers.SharedInformerFactory) *NodeControllerBuilder { + NodeInformer := sl.Zfs().V1().ZFSNodes() + cb.NodeController.NodeSynced = NodeInformer.Informer().HasSynced + return cb +} + +// withWorkqueue adds workqueue to controller object. +func (cb *NodeControllerBuilder) withWorkqueueRateLimiting() *NodeControllerBuilder { + cb.NodeController.workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Node") + return cb +} + +// withRecorder adds recorder to controller object. +func (cb *NodeControllerBuilder) withRecorder(ks kubernetes.Interface) *NodeControllerBuilder { + klog.Infof("Creating event broadcaster") + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(klog.Infof) + eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ks.CoreV1().Events("")}) + recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName}) + cb.NodeController.recorder = recorder + return cb +} + +// withEventHandler adds event handlers controller object. +func (cb *NodeControllerBuilder) withEventHandler(cvcInformerFactory informers.SharedInformerFactory) *NodeControllerBuilder { + cvcInformer := cvcInformerFactory.Zfs().V1().ZFSNodes() + // Set up an event handler for when zfs node vg change. + // Note: rather than setting up the resync period at informer level, + // we are controlling the syncing based on pollInternal. See + // NodeController#Run func for more details. + cvcInformer.Informer().AddEventHandlerWithResyncPeriod(cache.ResourceEventHandlerFuncs{ + AddFunc: cb.NodeController.addNode, + UpdateFunc: cb.NodeController.updateNode, + DeleteFunc: cb.NodeController.deleteNode, + }, 0) + return cb +} + +func (cb *NodeControllerBuilder) withPollInterval(interval time.Duration) *NodeControllerBuilder { + cb.NodeController.pollInterval = interval + return cb +} + +func (cb *NodeControllerBuilder) withOwnerReference(ownerRef metav1.OwnerReference) *NodeControllerBuilder { + cb.NodeController.ownerRef = ownerRef + return cb +} + +// Build returns a controller instance. +func (cb *NodeControllerBuilder) Build() (*NodeController, error) { + err := openebsScheme.AddToScheme(scheme.Scheme) + if err != nil { + return nil, err + } + return cb.NodeController, nil +} diff --git a/pkg/mgmt/zfsnode/start.go b/pkg/mgmt/zfsnode/start.go new file mode 100644 index 0000000..395ae4f --- /dev/null +++ b/pkg/mgmt/zfsnode/start.go @@ -0,0 +1,109 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package zfsnode + +import ( + "context" + "sync" + "time" + + k8sapi "github.com/openebs/lib-csi/pkg/client/k8s" + clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset" + informers "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions" + "github.com/openebs/zfs-localpv/pkg/zfs" + "github.com/pkg/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/kubernetes" +) + +// Start starts the zfsnode controller. +func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error { + + // Get in cluster config + cfg, err := k8sapi.Config().Get() + if err != nil { + return errors.Wrap(err, "error building kubeconfig") + } + + // Building Kubernetes Clientset + kubeClient, err := kubernetes.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building kubernetes clientset") + } + + // Building OpenEBS Clientset + openebsClient, err := clientset.NewForConfig(cfg) + if err != nil { + return errors.Wrap(err, "error building openebs clientset") + } + + // setup watch only on node we are interested in. + nodeInformerFactory := informers.NewSharedInformerFactoryWithOptions( + openebsClient, 0, informers.WithNamespace(zfs.OpenEBSNamespace), + informers.WithTweakListOptions(func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermEqualSelector("metadata.name", zfs.NodeID).String() + })) + + k8sNode, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), zfs.NodeID, metav1.GetOptions{}) + if err != nil { + return errors.Wrapf(err, "fetch k8s node %s", zfs.NodeID) + } + isTrue := true + // as object returned by client go clears all TypeMeta from it. + nodeGVK := &schema.GroupVersionKind{ + Group: "", Version: "v1", Kind: "Node", + } + ownerRef := metav1.OwnerReference{ + APIVersion: nodeGVK.GroupVersion().String(), + Kind: nodeGVK.Kind, + Name: k8sNode.Name, + UID: k8sNode.GetUID(), + Controller: &isTrue, + } + + // Build() fn of all controllers calls AddToScheme to adds all types of this + // clientset into the given scheme. + // If multiple controllers happen to call this AddToScheme same time, + // it causes panic with error saying concurrent map access. + // This lock is used to serialize the AddToScheme call of all controllers. + controllerMtx.Lock() + + controller, err := NewNodeControllerBuilder(). + withKubeClient(kubeClient). + withOpenEBSClient(openebsClient). + withNodeSynced(nodeInformerFactory). + withNodeLister(nodeInformerFactory). + withRecorder(kubeClient). + withEventHandler(nodeInformerFactory). + withPollInterval(60 * time.Second). + withOwnerReference(ownerRef). + withWorkqueueRateLimiting().Build() + + // blocking call, can't use defer to release the lock + controllerMtx.Unlock() + + if err != nil { + return errors.Wrapf(err, "error building controller instance") + } + + nodeInformerFactory.Start(stopCh) + + // Threadiness defines the number of workers to be launched in Run function + return controller.Run(1, stopCh) +} diff --git a/pkg/mgmt/zfsnode/zfsnode.go b/pkg/mgmt/zfsnode/zfsnode.go new file mode 100644 index 0000000..8acf9bd --- /dev/null +++ b/pkg/mgmt/zfsnode/zfsnode.go @@ -0,0 +1,307 @@ +/* + Copyright © 2021 The OpenEBS Authors + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package zfsnode + +import ( + "fmt" + "reflect" + "time" + + apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" + "github.com/openebs/zfs-localpv/pkg/builder/nodebuilder" + "github.com/openebs/zfs-localpv/pkg/equality" + "github.com/openebs/zfs-localpv/pkg/zfs" + k8serror "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/klog" +) + +func (c *NodeController) listZFSPool() ([]apis.Pool, error) { + return zfs.ListZFSPool() +} + +// syncHandler compares the actual state with the desired, and attempts to +// converge the two. +func (c *NodeController) syncHandler(key string) error { + // Convert the namespace/name string into a distinct namespace and name + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + runtime.HandleError(fmt.Errorf("invalid resource key: %s", key)) + return nil + } + + return c.syncNode(namespace, name) +} + +// syncNode is the function which tries to converge to a desired state for the +// ZFSNode +func (c *NodeController) syncNode(namespace string, name string) error { + // Get the node resource with this namespace/name + cachedNode, err := c.NodeLister.ZFSNodes(namespace).Get(name) + if err != nil && !k8serror.IsNotFound(err) { + return err + } + var node *apis.ZFSNode + if cachedNode != nil { + node = cachedNode.DeepCopy() + } + + pools, err := c.listZFSPool() + if err != nil { + return err + } + + if node == nil { // if it doesn't exists, create zfs node object + if node, err = nodebuilder.NewBuilder(). + WithNamespace(namespace).WithName(name). + WithPools(pools). + WithOwnerReferences(c.ownerRef). + Build(); err != nil { + return err + } + + klog.Infof("zfs node controller: creating new node object for %+v", node) + if node, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Create(node); err != nil { + return fmt.Errorf("create zfs node %s/%s: %v", namespace, name, err) + } + klog.Infof("zfs node controller: created node object %s/%s", namespace, name) + return nil + } + + // zfs node already exists check if we need to update it. + var updateRequired bool + // validate if owner reference updated. + if ownerRefs, req := c.isOwnerRefsUpdateRequired(node.OwnerReferences); req { + klog.Infof("zfs node controller: node owner references updated current=%+v, required=%+v", + node.OwnerReferences, ownerRefs) + node.OwnerReferences = ownerRefs + updateRequired = true + } + + // validate if node pools are upto date. + if !equality.Semantic.DeepEqual(node.Pools, pools) { + klog.Infof("zfs node controller: node pools updated current=%+v, required=%+v", + node.Pools, pools) + node.Pools = pools + updateRequired = true + } + + if !updateRequired { + return nil + } + + klog.Infof("zfs node controller: updating node object with %+v", node) + if _, err = nodebuilder.NewKubeclient().WithNamespace(namespace).Update(node); err != nil { + return fmt.Errorf("update zfs node %s/%s: %v", namespace, name, err) + } + klog.Infof("zfs node controller: updated node object %s/%s", namespace, name) + + return nil +} + +// addNode is the add event handler for ZFSNode +func (c *NodeController) addNode(obj interface{}) { + node, ok := obj.(*apis.ZFSNode) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", obj)) + return + } + + klog.Infof("Got add event for zfs node %s/%s", node.Namespace, node.Name) + c.enqueueNode(node) +} + +// updateNode is the update event handler for ZFSNode +func (c *NodeController) updateNode(oldObj, newObj interface{}) { + newNode, ok := newObj.(*apis.ZFSNode) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get node object %#v", newNode)) + return + } + + klog.Infof("Got update event for zfs node %s/%s", newNode.Namespace, newNode.Name) + c.enqueueNode(newNode) +} + +// deleteNode is the delete event handler for ZFSNode +func (c *NodeController) deleteNode(obj interface{}) { + node, ok := obj.(*apis.ZFSNode) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj)) + return + } + node, ok = tombstone.Obj.(*apis.ZFSNode) + if !ok { + runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a ZFSNode %#v", obj)) + return + } + } + + klog.Infof("Got delete event for node %s/%s", node.Namespace, node.Name) + c.enqueueNode(node) +} + +// enqueueNode takes a ZFSNode resource and converts it into a namespace/name +// string which is then put onto the work queue. This method should *not* be +// passed resources of any type other than ZFSNode. +func (c *NodeController) enqueueNode(node *apis.ZFSNode) { + // node must exists in openebs namespace & must equal to the node id. + if node.Namespace != zfs.OpenEBSNamespace || + node.Name != zfs.NodeID { + klog.Warningf("skipping zfs node object %s/%s", node.Namespace, node.Name) + return + } + + key, err := cache.MetaNamespaceKeyFunc(node) + if err != nil { + runtime.HandleError(err) + return + } + c.workqueue.Add(key) +} + +// Run will set up the event handlers for types we are interested in, as well +// as syncing informer caches and starting workers. It will block until stopCh +// is closed, at which point it will shutdown the workqueue and wait for +// workers to finish processing their current work items. +func (c *NodeController) Run(threadiness int, stopCh <-chan struct{}) error { + defer runtime.HandleCrash() + defer c.workqueue.ShutDown() + + // Start the informer factories to begin populating the informer caches + klog.Info("Starting Node controller") + + // Wait for the k8s caches to be synced before starting workers + klog.Info("Waiting for informer caches to sync") + if ok := cache.WaitForCacheSync(stopCh, c.NodeSynced); !ok { + return fmt.Errorf("failed to wait for caches to sync") + } + + klog.Info("Starting Node workers") + // Launch worker to process Node resources + // Threadiness will decide the number of workers you want to launch to process work items from queue + for i := 0; i < threadiness; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + klog.Info("Started Node workers") + + timer := time.NewTimer(0) + defer timer.Stop() + for { + select { + case <-timer.C: + case <-stopCh: + klog.Info("Shutting down Node controller") + return nil + } + item := zfs.OpenEBSNamespace + "/" + zfs.NodeID + c.workqueue.Add(item) // add the item to worker queue. + timer.Reset(c.pollInterval) + } +} + +// runWorker is a long-running function that will continually call the +// processNextWorkItem function in order to read and process a message on the +// workqueue. +func (c *NodeController) runWorker() { + for c.processNextWorkItem() { + } +} + +// processNextWorkItem will read a single work item off the workqueue and +// attempt to process it, by calling the syncHandler. +func (c *NodeController) processNextWorkItem() bool { + obj, shutdown := c.workqueue.Get() + + if shutdown { + return false + } + + // We wrap this block in a func so we can defer c.workqueue.Done. + err := func(obj interface{}) error { + // We call Done here so the workqueue knows we have finished + // processing this item. We also must remember to call Forget if we + // do not want this work item being re-queued. For example, we do + // not call Forget if a transient error occurs, instead the item is + // put back on the workqueue and attempted again after a back-off + // period. + defer c.workqueue.Done(obj) + var key string + var ok bool + // We expect strings to come off the workqueue. These are of the + // form namespace/name. We do this as the delayed nature of the + // workqueue means the items in the informer cache may actually be + // more up to date that when the item was initially put onto the + // workqueue. + if key, ok = obj.(string); !ok { + // As the item in the workqueue is actually invalid, we call + // Forget here else we'd go into a loop of attempting to + // process a work item that is invalid. + c.workqueue.Forget(obj) + runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj)) + return nil + } + // Run the syncHandler, passing it the namespace/name string of the + // Node resource to be synced. + if err := c.syncHandler(key); err != nil { + // Put the item back on the workqueue to handle any transient errors. + c.workqueue.AddRateLimited(key) + return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error()) + } + // Finally, if no error occurs we Forget this item so it does not + // get queued again until another change happens. + c.workqueue.Forget(obj) + klog.V(5).Infof("Successfully synced '%s'", key) + return nil + }(obj) + + if err != nil { + runtime.HandleError(err) + return true + } + + return true +} + +// isOwnerRefUpdateRequired validates if relevant owner references is being +// set for zfs node. If not, it returns the final owner references that needs +// to be set. +func (c *NodeController) isOwnerRefsUpdateRequired(ownerRefs []metav1.OwnerReference) ([]metav1.OwnerReference, bool) { + updated := false + reqOwnerRef := c.ownerRef + for idx := range ownerRefs { + if ownerRefs[idx].UID != reqOwnerRef.UID { + continue + } + // in case owner reference exists, validate + // if controller field is set correctly or not. + if !reflect.DeepEqual(ownerRefs[idx].Controller, reqOwnerRef.Controller) { + updated = true + ownerRefs[idx].Controller = reqOwnerRef.Controller + } + return ownerRefs, updated + } + updated = true + ownerRefs = append(ownerRefs, reqOwnerRef) + return ownerRefs, updated +} diff --git a/pkg/zfs/zfs_util.go b/pkg/zfs/zfs_util.go index 6f21013..b810fe7 100644 --- a/pkg/zfs/zfs_util.go +++ b/pkg/zfs/zfs_util.go @@ -17,8 +17,10 @@ limitations under the License. package zfs import ( + "bufio" "os/exec" "path/filepath" + "strconv" "fmt" "os" @@ -30,6 +32,7 @@ import ( "github.com/openebs/lib-csi/pkg/xfs" apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/zfs/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog" ) @@ -893,3 +896,48 @@ func CreateRestore(rstr *apis.ZFSRestore) error { return nil } + +// ListZFSPool invokes `zfs list` to list all the available +// pools in the node. +func ListZFSPool() ([]apis.Pool, error) { + args := []string{ + ZFSListArg, "-s", "name", + "-o", "name,guid,available", + "-H", "-p", + } + cmd := exec.Command(ZFSVolCmd, args...) + output, err := cmd.CombinedOutput() + if err != nil { + klog.Errorf("zfs: could not list zpool cmd %v: %v", args, err) + return nil, err + } + return decodeListOutput(output) +} + +// The `zfs list` command will list down all the resources including +// pools and volumes and as the pool names cannot have "/" in the name +// the function below filters out the pools. Sample output of command: +// $ zfs list -s name -o name,guid,available -H -p +// zfspv-pool 4734063099997348493 103498467328 +// zfspv-pool/pvc-be02d230-3738-4de9-8968-70f5d10d86dd 3380225606535803752 4294942720 +func decodeListOutput(raw []byte) ([]apis.Pool, error) { + scanner := bufio.NewScanner(strings.NewReader(string(raw))) + pools := []apis.Pool{} + for scanner.Scan() { + items := strings.Split(strings.TrimSpace(scanner.Text()), "\t") + if !strings.Contains(items[0], "/") { + var pool apis.Pool + pool.Name = items[0] + pool.UUID = items[1] + sizeBytes, err := strconv.ParseInt(items[2], + 10, 64) + if err != nil { + err = fmt.Errorf("cannot get free size for pool %v: %v", pool.Name, err) + return pools, err + } + pool.Free = *resource.NewQuantity(sizeBytes, resource.BinarySI) + pools = append(pools, pool) + } + } + return pools, nil +}