feat(zfspv): adding snapshot and clone support for ZFSPV (#39)

This commits support snapshot and clone commands via CSI driver. User can create snap and clone using the following steps. 

Note:
- Snapshot is created via reconciliation CR
- Cloned volume will be on the same zpool where the snapshot is taken
- Cloned volume will have same properties as source volume. 

-----------------------------------
Create a Snapshotclass
```
kind: VolumeSnapshotClass
apiVersion: snapshot.storage.k8s.io/v1beta1
metadata:
  name: zfspv-snapclass
  annotations:
    snapshot.storage.kubernetes.io/is-default-class: "true"
driver: zfs.csi.openebs.io
deletionPolicy: Delete
```
Once snapshotclass is created, we can use this class to create a Snapshot 
```
apiVersion: snapshot.storage.k8s.io/v1beta1
kind: VolumeSnapshot
metadata:
  name: zfspv-snap
spec:
  volumeSnapshotClassName: zfspv-snapclass
  source:
    persistentVolumeClaimName: csi-zfspv
```
```
$ kubectl get volumesnapshot
NAME          AGE
zfspv-snap    7m52s
```
```
$ kubectl get volumesnapshot -o yaml
apiVersion: v1
items:
- apiVersion: snapshot.storage.k8s.io/v1beta1
  kind: VolumeSnapshot
  metadata:
    annotations:
      kubectl.kubernetes.io/last-applied-configuration: |
        {"apiVersion":"snapshot.storage.k8s.io/v1beta1","kind":"VolumeSnapshot","metadata":{"annotations":{},"name":"zfspv-snap","namespace":"default"},"spec":{"source":{"persistentVolumeClaimName":"csi-zfspv"},"volumeSnapshotClassName":"zfspv-snapclass"}}
    creationTimestamp: "2020-01-30T10:31:24Z"
    finalizers:
    - snapshot.storage.kubernetes.io/volumesnapshot-as-source-protection
    - snapshot.storage.kubernetes.io/volumesnapshot-bound-protection
    generation: 1
    name: zfspv-snap
    namespace: default
    resourceVersion: "30040"
    selfLink: /apis/snapshot.storage.k8s.io/v1beta1/namespaces/default/volumesnapshots/zfspv-snap
    uid: 1a5cf166-c599-4f58-9f3c-f1148be47fca
  spec:
    source:
      persistentVolumeClaimName: csi-zfspv
    volumeSnapshotClassName: zfspv-snapclass
  status:
    boundVolumeSnapshotContentName: snapcontent-1a5cf166-c599-4f58-9f3c-f1148be47fca
    creationTime: "2020-01-30T10:31:24Z"
    readyToUse: true
    restoreSize: "0"
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""
```


Openebs resource for the created snapshot 
```
$ kubectl get snap -n openebs -o yaml
apiVersion: v1
items:
- apiVersion: openebs.io/v1alpha1
  kind: ZFSSnapshot
  metadata:
    creationTimestamp: "2020-01-30T10:31:24Z"
    finalizers:
    - zfs.openebs.io/finalizer
    generation: 2
    labels:
      kubernetes.io/nodename: pawan-2
      openebs.io/persistent-volume: pvc-18cab7c3-ec5e-4264-8507-e6f7df4c789a
    name: snapshot-1a5cf166-c599-4f58-9f3c-f1148be47fca
    namespace: openebs
    resourceVersion: "30035"
    selfLink: /apis/openebs.io/v1alpha1/namespaces/openebs/zfssnapshots/snapshot-1a5cf166-c599-4f58-9f3c-f1148be47fca
    uid: e29d571c-42b5-4fb7-9110-e1cfc9b96641
  spec:
    capacity: "4294967296"
    fsType: zfs
    ownerNodeID: pawan-2
    poolName: zfspv-pool
    status: Ready
    volumeType: DATASET
kind: List
metadata:
  resourceVersion: ""
  selfLink: ""
```

Create a clone volume
    
 We can provide a datasource as snapshot name to create a clone volume
    
```yaml
    kind: PersistentVolumeClaim
    apiVersion: v1
    metadata:
      name: zfspv-clone
    spec:
      storageClassName: openebs-zfspv
      dataSource:
        name: zfspv-snap
        kind: VolumeSnapshot
        apiGroup: snapshot.storage.k8s.io
      accessModes:
        - ReadWriteOnce
      resources:
        requests:
          storage: 4Gi
```
It will create a ZFS clone volume from the mentioned snapshot and create the PV on the same node where original volume is there.
    
Here, As resize is not supported yet, the clone PVC size should match the size of the snapshot.
Also, all the properties from the storageclass will not be considered for the clone case, it will take the properties from the snapshot and create the clone volume. One thing to note here is that, the storageclass in clone PVC should have the same poolname as that of the original volume as across the pool, clone is not supported.


Signed-off-by: Pawan <pawan@mayadata.io>
This commit is contained in:
Pawan Prakash Sharma 2020-02-13 13:31:17 +05:30 committed by GitHub
parent b0434bb537
commit 287606b78a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
40 changed files with 2995 additions and 123 deletions

1
.gitignore vendored
View file

@ -4,3 +4,4 @@ cscope*
tags
*.swp
*.swo
*.swn

View file

@ -0,0 +1,16 @@
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: zfspv-clone
spec:
storageClassName: openebs-zfspv
dataSource:
name: zfspv-snap
kind: VolumeSnapshot
apiGroup: snapshot.storage.k8s.io
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 4Gi

View file

@ -0,0 +1,18 @@
kind: VolumeSnapshotClass
apiVersion: snapshot.storage.k8s.io/v1beta1
metadata:
name: zfspv-snapclass
annotations:
snapshot.storage.kubernetes.io/is-default-class: "true"
driver: zfs.csi.openebs.io
deletionPolicy: Delete
---
apiVersion: snapshot.storage.k8s.io/v1beta1
kind: VolumeSnapshot
metadata:
name: zfspv-snap
spec:
volumeSnapshotClassName: zfspv-snapclass
source:
persistentVolumeClaimName: csi-zfspv

View file

@ -47,8 +47,450 @@ spec:
name: Filesystem
description: filesystem created on the volume
type: string
---
##############################################
########### ############
########### Snapshot CRDs ############
########### ############
##############################################
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
name: zfssnapshots.openebs.io
spec:
group: openebs.io
version: v1alpha1
scope: Namespaced
names:
plural: zfssnapshots
singular: zfssnapshot
kind: ZFSSnapshot
shortNames:
- zfssnapshot
- zfssnap
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
api-approved.kubernetes.io: "https://github.com/kubernetes-csi/external-snapshotter/pull/139"
creationTimestamp: null
name: volumesnapshotclasses.snapshot.storage.k8s.io
spec:
group: snapshot.storage.k8s.io
names:
kind: VolumeSnapshotClass
listKind: VolumeSnapshotClassList
plural: volumesnapshotclasses
singular: volumesnapshotclass
scope: Cluster
#preserveUnknownFields: false # this field is supported in kubernetes 1.15+ https://v1-15.docs.kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/
validation:
openAPIV3Schema:
description: VolumeSnapshotClass specifies parameters that a underlying storage
system uses when creating a volume snapshot. A specific VolumeSnapshotClass
is used by specifying its name in a VolumeSnapshot object. VolumeSnapshotClasses
are non-namespaced
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
type: string
deletionPolicy:
description: deletionPolicy determines whether a VolumeSnapshotContent created
through the VolumeSnapshotClass should be deleted when its bound VolumeSnapshot
is deleted. Supported values are "Retain" and "Delete". "Retain" means
that the VolumeSnapshotContent and its physical snapshot on underlying
storage system are kept. "Delete" means that the VolumeSnapshotContent
and its physical snapshot on underlying storage system are deleted. Required.
enum:
- Delete
- Retain
type: string
driver:
description: driver is the name of the storage driver that handles this
VolumeSnapshotClass. Required.
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
type: string
parameters:
additionalProperties:
type: string
description: parameters is a key-value map with storage driver specific
parameters for creating snapshots. These values are opaque to Kubernetes.
type: object
required:
- deletionPolicy
- driver
type: object
version: v1beta1
versions:
- name: v1beta1
served: true
storage: true
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
api-approved.kubernetes.io: "https://github.com/kubernetes-csi/external-snapshotter/pull/139"
creationTimestamp: null
name: volumesnapshotcontents.snapshot.storage.k8s.io
spec:
group: snapshot.storage.k8s.io
names:
kind: VolumeSnapshotContent
listKind: VolumeSnapshotContentList
plural: volumesnapshotcontents
singular: volumesnapshotcontent
scope: Cluster
subresources:
status: {}
#preserveUnknownFields: false # this field is supported in kubernetes 1.15+ https://v1-15.docs.kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/
validation:
openAPIV3Schema:
description: VolumeSnapshotContent represents the actual "on-disk" snapshot
object in the underlying storage system
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
type: string
spec:
description: spec defines properties of a VolumeSnapshotContent created
by the underlying storage system. Required.
properties:
deletionPolicy:
description: deletionPolicy determines whether this VolumeSnapshotContent
and its physical snapshot on the underlying storage system should
be deleted when its bound VolumeSnapshot is deleted. Supported values
are "Retain" and "Delete". "Retain" means that the VolumeSnapshotContent
and its physical snapshot on underlying storage system are kept. "Delete"
means that the VolumeSnapshotContent and its physical snapshot on
underlying storage system are deleted. In dynamic snapshot creation
case, this field will be filled in with the "DeletionPolicy" field
defined in the VolumeSnapshotClass the VolumeSnapshot refers to. For
pre-existing snapshots, users MUST specify this field when creating
the VolumeSnapshotContent object. Required.
enum:
- Delete
- Retain
type: string
driver:
description: driver is the name of the CSI driver used to create the
physical snapshot on the underlying storage system. This MUST be the
same as the name returned by the CSI GetPluginName() call for that
driver. Required.
type: string
source:
description: source specifies from where a snapshot will be created.
This field is immutable after creation. Required.
properties:
snapshotHandle:
description: snapshotHandle specifies the CSI "snapshot_id" of a
pre-existing snapshot on the underlying storage system. This field
is immutable.
type: string
volumeHandle:
description: volumeHandle specifies the CSI "volume_id" of the volume
from which a snapshot should be dynamically taken from. This field
is immutable.
type: string
type: object
volumeSnapshotClassName:
description: name of the VolumeSnapshotClass to which this snapshot
belongs.
type: string
volumeSnapshotRef:
description: volumeSnapshotRef specifies the VolumeSnapshot object to
which this VolumeSnapshotContent object is bound. VolumeSnapshot.Spec.VolumeSnapshotContentName
field must reference to this VolumeSnapshotContent's name for the
bidirectional binding to be valid. For a pre-existing VolumeSnapshotContent
object, name and namespace of the VolumeSnapshot object MUST be provided
for binding to happen. This field is immutable after creation. Required.
properties:
apiVersion:
description: API version of the referent.
type: string
fieldPath:
description: 'If referring to a piece of an object instead of an
entire object, this string should contain a valid JSON/Go field
access statement, such as desiredState.manifest.containers[2].
For example, if the object reference is to a container within
a pod, this would take on a value like: "spec.containers{name}"
(where "name" refers to the name of the container that triggered
the event) or if no container name is specified "spec.containers[2]"
(container with index 2 in this pod). This syntax is chosen only
to have some well-defined way of referencing a part of an object.
TODO: this design is not final and this field is subject to change
in the future.'
type: string
kind:
description: 'Kind of the referent. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
type: string
name:
description: 'Name of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#names'
type: string
namespace:
description: 'Namespace of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/namespaces/'
type: string
resourceVersion:
description: 'Specific resourceVersion to which this reference is
made, if any. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#concurrency-control-and-consistency'
type: string
uid:
description: 'UID of the referent. More info: https://kubernetes.io/docs/concepts/overview/working-with-objects/names/#uids'
type: string
type: object
required:
- deletionPolicy
- driver
- source
- volumeSnapshotRef
type: object
status:
description: status represents the current information of a snapshot.
properties:
creationTime:
description: creationTime is the timestamp when the point-in-time snapshot
is taken by the underlying storage system. In dynamic snapshot creation
case, this field will be filled in with the "creation_time" value
returned from CSI "CreateSnapshotRequest" gRPC call. For a pre-existing
snapshot, this field will be filled with the "creation_time" value
returned from the CSI "ListSnapshots" gRPC call if the driver supports
it. If not specified, it indicates the creation time is unknown. The
format of this field is a Unix nanoseconds time encoded as an int64.
On Unix, the command `date +%s%N` returns the current time in nanoseconds
since 1970-01-01 00:00:00 UTC.
format: int64
type: integer
error:
description: error is the latest observed error during snapshot creation,
if any.
properties:
message:
description: 'message is a string detailing the encountered error
during snapshot creation if specified. NOTE: message may be logged,
and it should not contain sensitive information.'
type: string
time:
description: time is the timestamp when the error was encountered.
format: date-time
type: string
type: object
readyToUse:
description: readyToUse indicates if a snapshot is ready to be used
to restore a volume. In dynamic snapshot creation case, this field
will be filled in with the "ready_to_use" value returned from CSI
"CreateSnapshotRequest" gRPC call. For a pre-existing snapshot, this
field will be filled with the "ready_to_use" value returned from the
CSI "ListSnapshots" gRPC call if the driver supports it, otherwise,
this field will be set to "True". If not specified, it means the readiness
of a snapshot is unknown.
type: boolean
restoreSize:
description: restoreSize represents the complete size of the snapshot
in bytes. In dynamic snapshot creation case, this field will be filled
in with the "size_bytes" value returned from CSI "CreateSnapshotRequest"
gRPC call. For a pre-existing snapshot, this field will be filled
with the "size_bytes" value returned from the CSI "ListSnapshots"
gRPC call if the driver supports it. When restoring a volume from
this snapshot, the size of the volume MUST NOT be smaller than the
restoreSize if it is specified, otherwise the restoration will fail.
If not specified, it indicates that the size is unknown.
format: int64
minimum: 0
type: integer
snapshotHandle:
description: snapshotHandle is the CSI "snapshot_id" of a snapshot on
the underlying storage system. If not specified, it indicates that
dynamic snapshot creation has either failed or it is still in progress.
type: string
type: object
required:
- spec
type: object
version: v1beta1
versions:
- name: v1beta1
served: true
storage: true
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
---
apiVersion: apiextensions.k8s.io/v1beta1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: (devel)
api-approved.kubernetes.io: "https://github.com/kubernetes-csi/external-snapshotter/pull/139"
creationTimestamp: null
name: volumesnapshots.snapshot.storage.k8s.io
spec:
group: snapshot.storage.k8s.io
names:
kind: VolumeSnapshot
listKind: VolumeSnapshotList
plural: volumesnapshots
singular: volumesnapshot
scope: Namespaced
subresources:
status: {}
#preserveUnknownFields: false # this field is supported in kubernetes 1.15+ https://v1-15.docs.kubernetes.io/docs/tasks/access-kubernetes-api/custom-resources/custom-resource-definitions/
validation:
openAPIV3Schema:
description: VolumeSnapshot is a user's request for either creating a point-in-time
snapshot of a persistent volume, or binding to a pre-existing snapshot.
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
of an object. Servers should convert recognized schemas to the latest
internal value, and may reject unrecognized values. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#resources'
type: string
kind:
description: 'Kind is a string value representing the REST resource this
object represents. Servers may infer this from the endpoint the client
submits requests to. Cannot be updated. In CamelCase. More info: https://git.k8s.io/community/contributors/devel/api-conventions.md#types-kinds'
type: string
spec:
description: 'spec defines the desired characteristics of a snapshot requested
by a user. More info: https://kubernetes.io/docs/concepts/storage/volume-snapshots#volumesnapshots
Required.'
properties:
source:
description: source specifies where a snapshot will be created from.
This field is immutable after creation. Required.
properties:
persistentVolumeClaimName:
description: persistentVolumeClaimName specifies the name of the
PersistentVolumeClaim object in the same namespace as the VolumeSnapshot
object where the snapshot should be dynamically taken from. This
field is immutable.
type: string
volumeSnapshotContentName:
description: volumeSnapshotContentName specifies the name of a pre-existing
VolumeSnapshotContent object. This field is immutable.
type: string
type: object
volumeSnapshotClassName:
description: 'volumeSnapshotClassName is the name of the VolumeSnapshotClass
requested by the VolumeSnapshot. If not specified, the default snapshot
class will be used if one exists. If not specified, and there is no
default snapshot class, dynamic snapshot creation will fail. Empty
string is not allowed for this field. TODO(xiangqian): a webhook validation
on empty string. More info: https://kubernetes.io/docs/concepts/storage/volume-snapshot-classes'
type: string
required:
- source
type: object
status:
description: 'status represents the current information of a snapshot. NOTE:
status can be modified by sources other than system controllers, and must
not be depended upon for accuracy. Controllers should only use information
from the VolumeSnapshotContent object after verifying that the binding
is accurate and complete.'
properties:
boundVolumeSnapshotContentName:
description: 'boundVolumeSnapshotContentName represents the name of
the VolumeSnapshotContent object to which the VolumeSnapshot object
is bound. If not specified, it indicates that the VolumeSnapshot object
has not been successfully bound to a VolumeSnapshotContent object
yet. NOTE: Specified boundVolumeSnapshotContentName alone does not
mean binding is valid. Controllers MUST always verify bidirectional
binding between VolumeSnapshot and VolumeSnapshotContent to
avoid possible security issues.'
type: string
creationTime:
description: creationTime is the timestamp when the point-in-time snapshot
is taken by the underlying storage system. In dynamic snapshot creation
case, this field will be filled in with the "creation_time" value
returned from CSI "CreateSnapshotRequest" gRPC call. For a pre-existing
snapshot, this field will be filled with the "creation_time" value
returned from the CSI "ListSnapshots" gRPC call if the driver supports
it. If not specified, it indicates that the creation time of the snapshot
is unknown.
format: date-time
type: string
error:
description: error is the last observed error during snapshot creation,
if any. This field could be helpful to upper level controllers(i.e.,
application controller) to decide whether they should continue on
waiting for the snapshot to be created based on the type of error
reported.
properties:
message:
description: 'message is a string detailing the encountered error
during snapshot creation if specified. NOTE: message may be logged,
and it should not contain sensitive information.'
type: string
time:
description: time is the timestamp when the error was encountered.
format: date-time
type: string
type: object
readyToUse:
description: readyToUse indicates if a snapshot is ready to be used
to restore a volume. In dynamic snapshot creation case, this field
will be filled in with the "ready_to_use" value returned from CSI
"CreateSnapshotRequest" gRPC call. For a pre-existing snapshot, this
field will be filled with the "ready_to_use" value returned from the
CSI "ListSnapshots" gRPC call if the driver supports it, otherwise,
this field will be set to "True". If not specified, it means the readiness
of a snapshot is unknown.
type: boolean
restoreSize:
description: restoreSize represents the complete size of the snapshot
in bytes. In dynamic snapshot creation case, this field will be filled
in with the "size_bytes" value returned from CSI "CreateSnapshotRequest"
gRPC call. For a pre-existing snapshot, this field will be filled
with the "size_bytes" value returned from the CSI "ListSnapshots"
gRPC call if the driver supports it. When restoring a volume from
this snapshot, the size of the volume MUST NOT be smaller than the
restoreSize if it is specified, otherwise the restoration will fail.
If not specified, it indicates that the size is unknown.
type: string
type: object
required:
- spec
type: object
version: v1beta1
versions:
- name: v1beta1
served: true
storage: true
status:
acceptedNames:
kind: ""
plural: ""
conditions: []
storedVersions: []
---
##############################################
########### ############
########### Controller plugin ############
@ -86,7 +528,7 @@ rules:
resources: ["leases"]
verbs: ["get", "watch", "list", "delete", "update", "create"]
- apiGroups: ["*"]
resources: ["zfsvolumes"]
resources: ["zfsvolumes", "zfssnapshots"]
verbs: ["*"]
---
@ -135,6 +577,25 @@ spec:
priorityClassName: system-cluster-critical
serviceAccount: openebs-zfs-controller-sa
containers:
- name: csi-snapshotter
image: quay.io/k8scsi/csi-snapshotter:v2.0.1
imagePullPolicy: IfNotPresent
args:
- "--csi-address=$(ADDRESS)"
- "--leader-election"
env:
- name: ADDRESS
value: /var/lib/csi/sockets/pluginproxy/csi.sock
imagePullPolicy: Always
volumeMounts:
- name: socket-dir
mountPath: /var/lib/csi/sockets/pluginproxy/
- name: snapshot-controller
image: quay.io/k8scsi/snapshot-controller:v2.0.1
args:
- "--v=5"
- "--leader-election=false"
imagePullPolicy: IfNotPresent
- name: csi-provisioner
image: quay.io/k8scsi/csi-provisioner:v1.4.0
imagePullPolicy: IfNotPresent
@ -265,6 +726,12 @@ rules:
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots"]
verbs: ["get", "list", "watch", "update"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshotcontents/status"]
verbs: ["update"]
- apiGroups: ["snapshot.storage.k8s.io"]
resources: ["volumesnapshots/status"]
verbs: ["update"]
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["create", "list", "watch", "delete"]
@ -337,7 +804,7 @@ rules:
resources: ["persistentvolumes", "nodes", "services"]
verbs: ["get", "list"]
- apiGroups: ["*"]
resources: ["zfsvolumes"]
resources: ["zfsvolumes", "zfssnapshots"]
verbs: ["get", "list", "watch", "create", "update", "patch"]
---

View file

@ -71,6 +71,8 @@ func addKnownTypes(scheme *runtime.Scheme) error {
SchemeGroupVersion,
&ZFSVolume{},
&ZFSVolumeList{},
&ZFSSnapshot{},
&ZFSSnapshotList{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
return nil

View file

@ -0,0 +1,49 @@
/*
Copyright © 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +resource:path=zfssnapshot
// ZFSSnapshot represents a ZFS Snapshot of the zfsvolume
type ZFSSnapshot struct {
metav1.TypeMeta `json:",inline"`
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec VolumeInfo `json:"spec"`
Status SnapStatus `json:"status"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +resource:path=zfssnapshots
// ZFSSnapshotList is a list of ZFSSnapshot resources
type ZFSSnapshotList struct {
metav1.TypeMeta `json:",inline"`
metav1.ListMeta `json:"metadata"`
Items []ZFSSnapshot `json:"items"`
}
type SnapStatus struct {
State string `json:"state,omitempty"`
}

View file

@ -58,7 +58,7 @@ type MountInfo struct {
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// +resource:path=csivolumes
// +resource:path=zfsvolumes
// ZFSVolumeList is a list of ZFSVolume resources
type ZFSVolumeList struct {
@ -79,6 +79,10 @@ type VolumeInfo struct {
// pool where this volume should be created
PoolName string `json:"poolName"`
// SnapName specifies the name of the
// snapshot where this volume should be cloned
SnapName string `json:"snapname,omitempty"`
// Capacity of the volume
Capacity string `json:"capacity"`

View file

@ -50,6 +50,22 @@ func (in *MountInfo) DeepCopy() *MountInfo {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *SnapStatus) DeepCopyInto(out *SnapStatus) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new SnapStatus.
func (in *SnapStatus) DeepCopy() *SnapStatus {
if in == nil {
return nil
}
out := new(SnapStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *VolumeInfo) DeepCopyInto(out *VolumeInfo) {
*out = *in
@ -66,6 +82,67 @@ func (in *VolumeInfo) DeepCopy() *VolumeInfo {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ZFSSnapshot) DeepCopyInto(out *ZFSSnapshot) {
*out = *in
out.TypeMeta = in.TypeMeta
in.ObjectMeta.DeepCopyInto(&out.ObjectMeta)
out.Spec = in.Spec
out.Status = in.Status
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ZFSSnapshot.
func (in *ZFSSnapshot) DeepCopy() *ZFSSnapshot {
if in == nil {
return nil
}
out := new(ZFSSnapshot)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ZFSSnapshot) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ZFSSnapshotList) DeepCopyInto(out *ZFSSnapshotList) {
*out = *in
out.TypeMeta = in.TypeMeta
out.ListMeta = in.ListMeta
if in.Items != nil {
in, out := &in.Items, &out.Items
*out = make([]ZFSSnapshot, len(*in))
for i := range *in {
(*in)[i].DeepCopyInto(&(*out)[i])
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ZFSSnapshotList.
func (in *ZFSSnapshotList) DeepCopy() *ZFSSnapshotList {
if in == nil {
return nil
}
out := new(ZFSSnapshotList)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *ZFSSnapshotList) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ZFSVolume) DeepCopyInto(out *ZFSVolume) {
*out = *in

View file

@ -0,0 +1,116 @@
/*
Copyright 2020 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapbuilder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/common/errors"
)
// Builder is the builder object for ZFSSnapshot
type Builder struct {
snap *ZFSSnapshot
errs []error
}
// NewBuilder returns new instance of Builder
func NewBuilder() *Builder {
return &Builder{
snap: &ZFSSnapshot{
Object: &apis.ZFSSnapshot{},
},
}
}
// BuildFrom returns new instance of Builder
// from the provided api instance
func BuildFrom(snap *apis.ZFSSnapshot) *Builder {
if snap == nil {
b := NewBuilder()
b.errs = append(
b.errs,
errors.New("failed to build snap object: nil snap"),
)
return b
}
return &Builder{
snap: &ZFSSnapshot{
Object: snap,
},
}
}
// WithNamespace sets the namespace of ZFSSnapshot
func (b *Builder) WithNamespace(namespace string) *Builder {
if namespace == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi snap object: missing namespace",
),
)
return b
}
b.snap.Object.Namespace = namespace
return b
}
// WithName sets the name of ZFSSnapshot
func (b *Builder) WithName(name string) *Builder {
if name == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi snap object: missing name",
),
)
return b
}
b.snap.Object.Name = name
return b
}
// WithLabels merges existing labels if any
// with the ones that are provided here
func (b *Builder) WithLabels(labels map[string]string) *Builder {
if len(labels) == 0 {
return b
}
if b.snap.Object.Labels == nil {
b.snap.Object.Labels = map[string]string{}
}
for key, value := range labels {
b.snap.Object.Labels[key] = value
}
return b
}
func (b *Builder) WithFinalizer(finalizer []string) *Builder {
b.snap.Object.Finalizers = append(b.snap.Object.Finalizers, finalizer...)
return b
}
// Build returns ZFSSnapshot API object
func (b *Builder) Build() (*apis.ZFSSnapshot, error) {
if len(b.errs) > 0 {
return nil, errors.Errorf("%+v", b.errs)
}
return b.snap.Object, nil
}

View file

@ -0,0 +1,72 @@
/*
Copyright 2020 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapbuilder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
)
// ListBuilder enables building an instance of
// ZFSSnapshotList
type ListBuilder struct {
list *apis.ZFSSnapshotList
filters predicateList
}
// NewListBuilder returns a new instance of ListBuilder
func NewListBuilder() *ListBuilder {
return &ListBuilder{
list: &apis.ZFSSnapshotList{},
}
}
// ListBuilderFrom returns a new instance of
// ListBuilder from API list instance
func ListBuilderFrom(vols apis.ZFSSnapshotList) *ListBuilder {
b := &ListBuilder{list: &apis.ZFSSnapshotList{}}
if len(vols.Items) == 0 {
return b
}
b.list.Items = append(b.list.Items, vols.Items...)
return b
}
// List returns the list of pod
// instances that was built by this
// builder
func (b *ListBuilder) List() *apis.ZFSSnapshotList {
if b.filters == nil || len(b.filters) == 0 {
return b.list
}
filtered := &apis.ZFSSnapshotList{}
for _, vol := range b.list.Items {
vol := vol // pin it
if b.filters.all(From(&vol)) {
filtered.Items = append(filtered.Items, vol)
}
}
return filtered
}
// WithFilter add filters on which the pod
// has to be filtered
func (b *ListBuilder) WithFilter(pred ...Predicate) *ListBuilder {
b.filters = append(b.filters, pred...)
return b
}

View file

@ -0,0 +1,427 @@
// Copyright © 2020 The OpenEBS Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package snapbuilder
import (
"encoding/json"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
client "github.com/openebs/zfs-localpv/pkg/common/kubernetes/client"
clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset"
"github.com/pkg/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// getClientsetFn is a typed function that
// abstracts fetching of internal clientset
type getClientsetFn func() (clientset *clientset.Clientset, err error)
// getClientsetFromPathFn is a typed function that
// abstracts fetching of clientset from kubeConfigPath
type getClientsetForPathFn func(kubeConfigPath string) (
clientset *clientset.Clientset,
err error,
)
// createFn is a typed function that abstracts
// creating zfssnap volume instance
type createFn func(
cs *clientset.Clientset,
upgradeResultObj *apis.ZFSSnapshot,
namespace string,
) (*apis.ZFSSnapshot, error)
// getFn is a typed function that abstracts
// fetching a zfssnap volume instance
type getFn func(
cli *clientset.Clientset,
name,
namespace string,
opts metav1.GetOptions,
) (*apis.ZFSSnapshot, error)
// listFn is a typed function that abstracts
// listing of zfssnap volume instances
type listFn func(
cli *clientset.Clientset,
namespace string,
opts metav1.ListOptions,
) (*apis.ZFSSnapshotList, error)
// delFn is a typed function that abstracts
// deleting a zfssnap volume instance
type delFn func(
cli *clientset.Clientset,
name,
namespace string,
opts *metav1.DeleteOptions,
) error
// updateFn is a typed function that abstracts
// updating zfssnap volume instance
type updateFn func(
cs *clientset.Clientset,
vol *apis.ZFSSnapshot,
namespace string,
) (*apis.ZFSSnapshot, error)
// Kubeclient enables kubernetes API operations
// on zfssnap volume instance
type Kubeclient struct {
// clientset refers to zfssnap volume's
// clientset that will be responsible to
// make kubernetes API calls
clientset *clientset.Clientset
kubeConfigPath string
// namespace holds the namespace on which
// kubeclient has to operate
namespace string
// functions useful during mocking
getClientset getClientsetFn
getClientsetForPath getClientsetForPathFn
get getFn
list listFn
del delFn
create createFn
update updateFn
}
// KubeclientBuildOption defines the abstraction
// to build a kubeclient instance
type KubeclientBuildOption func(*Kubeclient)
// defaultGetClientset is the default implementation to
// get kubernetes clientset instance
func defaultGetClientset() (clients *clientset.Clientset, err error) {
config, err := client.GetConfig(client.New())
if err != nil {
return nil, err
}
return clientset.NewForConfig(config)
}
// defaultGetClientsetForPath is the default implementation to
// get kubernetes clientset instance based on the given
// kubeconfig path
func defaultGetClientsetForPath(
kubeConfigPath string,
) (clients *clientset.Clientset, err error) {
config, err := client.GetConfig(
client.New(client.WithKubeConfigPath(kubeConfigPath)))
if err != nil {
return nil, err
}
return clientset.NewForConfig(config)
}
// defaultGet is the default implementation to get
// a zfssnap volume instance in kubernetes cluster
func defaultGet(
cli *clientset.Clientset,
name, namespace string,
opts metav1.GetOptions,
) (*apis.ZFSSnapshot, error) {
return cli.OpenebsV1alpha1().
ZFSSnapshots(namespace).
Get(name, opts)
}
// defaultList is the default implementation to list
// zfssnap volume instances in kubernetes cluster
func defaultList(
cli *clientset.Clientset,
namespace string,
opts metav1.ListOptions,
) (*apis.ZFSSnapshotList, error) {
return cli.OpenebsV1alpha1().
ZFSSnapshots(namespace).
List(opts)
}
// defaultCreate is the default implementation to delete
// a zfssnap volume instance in kubernetes cluster
func defaultDel(
cli *clientset.Clientset,
name, namespace string,
opts *metav1.DeleteOptions,
) error {
deletePropagation := metav1.DeletePropagationForeground
opts.PropagationPolicy = &deletePropagation
err := cli.OpenebsV1alpha1().
ZFSSnapshots(namespace).
Delete(name, opts)
return err
}
// defaultCreate is the default implementation to create
// a zfssnap volume instance in kubernetes cluster
func defaultCreate(
cli *clientset.Clientset,
vol *apis.ZFSSnapshot,
namespace string,
) (*apis.ZFSSnapshot, error) {
return cli.OpenebsV1alpha1().
ZFSSnapshots(namespace).
Create(vol)
}
// defaultUpdate is the default implementation to update
// a zfssnap volume instance in kubernetes cluster
func defaultUpdate(
cli *clientset.Clientset,
vol *apis.ZFSSnapshot,
namespace string,
) (*apis.ZFSSnapshot, error) {
return cli.OpenebsV1alpha1().
ZFSSnapshots(namespace).
Update(vol)
}
// withDefaults sets the default options
// of kubeclient instance
func (k *Kubeclient) withDefaults() {
if k.getClientset == nil {
k.getClientset = defaultGetClientset
}
if k.getClientsetForPath == nil {
k.getClientsetForPath = defaultGetClientsetForPath
}
if k.get == nil {
k.get = defaultGet
}
if k.list == nil {
k.list = defaultList
}
if k.del == nil {
k.del = defaultDel
}
if k.create == nil {
k.create = defaultCreate
}
if k.update == nil {
k.update = defaultUpdate
}
}
// WithClientSet sets the kubernetes client against
// the kubeclient instance
func WithClientSet(c *clientset.Clientset) KubeclientBuildOption {
return func(k *Kubeclient) {
k.clientset = c
}
}
// WithNamespace sets the kubernetes client against
// the provided namespace
func WithNamespace(namespace string) KubeclientBuildOption {
return func(k *Kubeclient) {
k.namespace = namespace
}
}
// WithNamespace sets the provided namespace
// against this Kubeclient instance
func (k *Kubeclient) WithNamespace(namespace string) *Kubeclient {
k.namespace = namespace
return k
}
// WithKubeConfigPath sets the kubernetes client
// against the provided path
func WithKubeConfigPath(path string) KubeclientBuildOption {
return func(k *Kubeclient) {
k.kubeConfigPath = path
}
}
// NewKubeclient returns a new instance of
// kubeclient meant for zfssnap volume operations
func NewKubeclient(opts ...KubeclientBuildOption) *Kubeclient {
k := &Kubeclient{}
for _, o := range opts {
o(k)
}
k.withDefaults()
return k
}
func (k *Kubeclient) getClientsetForPathOrDirect() (
*clientset.Clientset,
error,
) {
if k.kubeConfigPath != "" {
return k.getClientsetForPath(k.kubeConfigPath)
}
return k.getClientset()
}
// getClientOrCached returns either a new instance
// of kubernetes client or its cached copy
func (k *Kubeclient) getClientOrCached() (*clientset.Clientset, error) {
if k.clientset != nil {
return k.clientset, nil
}
c, err := k.getClientsetForPathOrDirect()
if err != nil {
return nil,
errors.Wrapf(
err,
"failed to get clientset",
)
}
k.clientset = c
return k.clientset, nil
}
// Create creates a zfssnap volume instance
// in kubernetes cluster
func (k *Kubeclient) Create(vol *apis.ZFSSnapshot) (*apis.ZFSSnapshot, error) {
if vol == nil {
return nil,
errors.New(
"failed to create csivolume: nil vol object",
)
}
cs, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to create zfssnap volume {%s} in namespace {%s}",
vol.Name,
k.namespace,
)
}
return k.create(cs, vol, k.namespace)
}
// Get returns zfssnap volume object for given name
func (k *Kubeclient) Get(
name string,
opts metav1.GetOptions,
) (*apis.ZFSSnapshot, error) {
if name == "" {
return nil,
errors.New(
"failed to get zfssnap volume: missing zfssnap volume name",
)
}
cli, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to get zfssnap volume {%s} in namespace {%s}",
name,
k.namespace,
)
}
return k.get(cli, name, k.namespace, opts)
}
// GetRaw returns zfssnap volume instance
// in bytes
func (k *Kubeclient) GetRaw(
name string,
opts metav1.GetOptions,
) ([]byte, error) {
if name == "" {
return nil, errors.New(
"failed to get raw zfssnap volume: missing vol name",
)
}
csiv, err := k.Get(name, opts)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to get zfssnap volume {%s} in namespace {%s}",
name,
k.namespace,
)
}
return json.Marshal(csiv)
}
// List returns a list of zfssnap volume
// instances present in kubernetes cluster
func (k *Kubeclient) List(opts metav1.ListOptions) (*apis.ZFSSnapshotList, error) {
cli, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to list zfssnap volumes in namespace {%s}",
k.namespace,
)
}
return k.list(cli, k.namespace, opts)
}
// Delete deletes the zfssnap volume from
// kubernetes
func (k *Kubeclient) Delete(name string) error {
if name == "" {
return errors.New(
"failed to delete csivolume: missing vol name",
)
}
cli, err := k.getClientOrCached()
if err != nil {
return errors.Wrapf(
err,
"failed to delete csivolume {%s} in namespace {%s}",
name,
k.namespace,
)
}
return k.del(cli, name, k.namespace, &metav1.DeleteOptions{})
}
// Update updates this zfssnap volume instance
// against kubernetes cluster
func (k *Kubeclient) Update(vol *apis.ZFSSnapshot) (*apis.ZFSSnapshot, error) {
if vol == nil {
return nil,
errors.New(
"failed to update csivolume: nil vol object",
)
}
cs, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to update csivolume {%s} in namespace {%s}",
vol.Name,
vol.Namespace,
)
}
return k.update(cs, vol, k.namespace)
}

View file

@ -0,0 +1,117 @@
// Copyright © 2020 The OpenEBS Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package snapbuilder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
)
// ZFSSnapshot is a wrapper over
// ZFSSnapshot API instance
type ZFSSnapshot struct {
// ZFSSnap object
Object *apis.ZFSSnapshot
}
// From returns a new instance of
// zfssnap volume
func From(snap *apis.ZFSSnapshot) *ZFSSnapshot {
return &ZFSSnapshot{
Object: snap,
}
}
// Predicate defines an abstraction
// to determine conditional checks
// against the provided pod instance
type Predicate func(*ZFSSnapshot) bool
// PredicateList holds a list of predicate
type predicateList []Predicate
// ZFSSnapshotList holds the list
// of zfs snapshot instances
type ZFSSnapshotList struct {
// List contains list of snapshots
List apis.ZFSSnapshotList
}
// Len returns the number of items present
// in the ZFSSnapshotList
func (snapList *ZFSSnapshotList) Len() int {
return len(snapList.List.Items)
}
// all returns true if all the predicates
// succeed against the provided ZFSSnapshot
// instance
func (l predicateList) all(snap *ZFSSnapshot) bool {
for _, pred := range l {
if !pred(snap) {
return false
}
}
return true
}
// HasLabels returns true if provided labels
// are present in the provided ZFSSnapshot instance
func HasLabels(keyValuePair map[string]string) Predicate {
return func(snap *ZFSSnapshot) bool {
for key, value := range keyValuePair {
if !snap.HasLabel(key, value) {
return false
}
}
return true
}
}
// HasLabel returns true if provided label
// is present in the provided ZFSSnapshot instance
func (snap *ZFSSnapshot) HasLabel(key, value string) bool {
val, ok := snap.Object.GetLabels()[key]
if ok {
return val == value
}
return false
}
// HasLabel returns true if provided label
// is present in the provided ZFSSnapshot instance
func HasLabel(key, value string) Predicate {
return func(snap *ZFSSnapshot) bool {
return snap.HasLabel(key, value)
}
}
// IsNil returns true if the zfssnap volume instance
// is nil
func (snap *ZFSSnapshot) IsNil() bool {
return snap.Object == nil
}
// IsNil is predicate to filter out nil zfssnap volume
// instances
func IsNil() Predicate {
return func(snap *ZFSSnapshot) bool {
return snap.IsNil()
}
}
// GetAPIObject returns zfssnap volume's API instance
func (snap *ZFSSnapshot) GetAPIObject() *apis.ZFSSnapshot {
return snap.Object
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package builder
package volbuilder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
@ -60,7 +60,7 @@ func (b *Builder) WithNamespace(namespace string) *Builder {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing namespace",
"failed to build zfs volume object: missing namespace",
),
)
return b
@ -75,7 +75,7 @@ func (b *Builder) WithName(name string) *Builder {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing name",
"failed to build zfs volume object: missing name",
),
)
return b
@ -84,14 +84,14 @@ func (b *Builder) WithName(name string) *Builder {
return b
}
// WithCapacity sets the Capacity of csi volume by converting string
// WithCapacity sets the Capacity of zfs volume by converting string
// capacity into Quantity
func (b *Builder) WithCapacity(capacity string) *Builder {
if capacity == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing capacity",
"failed to build zfs volume object: missing capacity",
),
)
return b
@ -166,12 +166,18 @@ func (b *Builder) WithFsType(fstype string) *Builder {
return b
}
// WithSnapshot sets Snapshot name for creating clone volume
func (b *Builder) WithSnapshot(snap string) *Builder {
b.volume.Object.Spec.SnapName = snap
return b
}
func (b *Builder) WithPoolName(pool string) *Builder {
if pool == "" {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing pool name",
"failed to build zfs volume object: missing pool name",
),
)
return b
@ -185,7 +191,7 @@ func (b *Builder) WithNodename(name string) *Builder {
b.errs = append(
b.errs,
errors.New(
"failed to build csi volume object: missing node name",
"failed to build zfs volume object: missing node name",
),
)
return b
@ -198,10 +204,6 @@ func (b *Builder) WithNodename(name string) *Builder {
// with the ones that are provided here
func (b *Builder) WithLabels(labels map[string]string) *Builder {
if len(labels) == 0 {
b.errs = append(
b.errs,
errors.New("failed to build zfs volume object: missing labels"),
)
return b
}
@ -220,7 +222,7 @@ func (b *Builder) WithFinalizer(finalizer []string) *Builder {
return b
}
// Build returns csi volume API object
// Build returns ZFSVolume API object
func (b *Builder) Build() (*apis.ZFSVolume, error) {
if len(b.errs) > 0 {
return nil, errors.Errorf("%+v", b.errs)

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package builder
package volbuilder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package builder
package volbuilder
import (
"encoding/json"
@ -36,7 +36,7 @@ type getClientsetForPathFn func(kubeConfigPath string) (
)
// createFn is a typed function that abstracts
// creating csi volume instance
// creating zfs volume instance
type createFn func(
cs *clientset.Clientset,
upgradeResultObj *apis.ZFSVolume,
@ -44,7 +44,7 @@ type createFn func(
) (*apis.ZFSVolume, error)
// getFn is a typed function that abstracts
// fetching a csi volume instance
// fetching a zfs volume instance
type getFn func(
cli *clientset.Clientset,
name,
@ -53,7 +53,7 @@ type getFn func(
) (*apis.ZFSVolume, error)
// listFn is a typed function that abstracts
// listing of csi volume instances
// listing of zfs volume instances
type listFn func(
cli *clientset.Clientset,
namespace string,
@ -61,7 +61,7 @@ type listFn func(
) (*apis.ZFSVolumeList, error)
// delFn is a typed function that abstracts
// deleting a csi volume instance
// deleting a zfs volume instance
type delFn func(
cli *clientset.Clientset,
name,
@ -70,7 +70,7 @@ type delFn func(
) error
// updateFn is a typed function that abstracts
// updating csi volume instance
// updating zfs volume instance
type updateFn func(
cs *clientset.Clientset,
vol *apis.ZFSVolume,
@ -78,9 +78,9 @@ type updateFn func(
) (*apis.ZFSVolume, error)
// Kubeclient enables kubernetes API operations
// on csi volume instance
// on zfs volume instance
type Kubeclient struct {
// clientset refers to csi volume's
// clientset refers to zfs volume's
// clientset that will be responsible to
// make kubernetes API calls
clientset *clientset.Clientset
@ -134,7 +134,7 @@ func defaultGetClientsetForPath(
}
// defaultGet is the default implementation to get
// a csi volume instance in kubernetes cluster
// a zfs volume instance in kubernetes cluster
func defaultGet(
cli *clientset.Clientset,
name, namespace string,
@ -146,7 +146,7 @@ func defaultGet(
}
// defaultList is the default implementation to list
// csi volume instances in kubernetes cluster
// zfs volume instances in kubernetes cluster
func defaultList(
cli *clientset.Clientset,
namespace string,
@ -158,7 +158,7 @@ func defaultList(
}
// defaultCreate is the default implementation to delete
// a csi volume instance in kubernetes cluster
// a zfs volume instance in kubernetes cluster
func defaultDel(
cli *clientset.Clientset,
name, namespace string,
@ -173,7 +173,7 @@ func defaultDel(
}
// defaultCreate is the default implementation to create
// a csi volume instance in kubernetes cluster
// a zfs volume instance in kubernetes cluster
func defaultCreate(
cli *clientset.Clientset,
vol *apis.ZFSVolume,
@ -185,7 +185,7 @@ func defaultCreate(
}
// defaultUpdate is the default implementation to update
// a csi volume instance in kubernetes cluster
// a zfs volume instance in kubernetes cluster
func defaultUpdate(
cli *clientset.Clientset,
vol *apis.ZFSVolume,
@ -254,7 +254,7 @@ func WithKubeConfigPath(path string) KubeclientBuildOption {
}
// NewKubeclient returns a new instance of
// kubeclient meant for csi volume operations
// kubeclient meant for zfs volume operations
func NewKubeclient(opts ...KubeclientBuildOption) *Kubeclient {
k := &Kubeclient{}
for _, o := range opts {
@ -296,7 +296,7 @@ func (k *Kubeclient) getClientOrCached() (*clientset.Clientset, error) {
return k.clientset, nil
}
// Create creates a csi volume instance
// Create creates a zfs volume instance
// in kubernetes cluster
func (k *Kubeclient) Create(vol *apis.ZFSVolume) (*apis.ZFSVolume, error) {
if vol == nil {
@ -309,7 +309,7 @@ func (k *Kubeclient) Create(vol *apis.ZFSVolume) (*apis.ZFSVolume, error) {
if err != nil {
return nil, errors.Wrapf(
err,
"failed to create csi volume {%s} in namespace {%s}",
"failed to create zfs volume {%s} in namespace {%s}",
vol.Name,
k.namespace,
)
@ -318,7 +318,7 @@ func (k *Kubeclient) Create(vol *apis.ZFSVolume) (*apis.ZFSVolume, error) {
return k.create(cs, vol, k.namespace)
}
// Get returns csi volume object for given name
// Get returns zfs volume object for given name
func (k *Kubeclient) Get(
name string,
opts metav1.GetOptions,
@ -326,7 +326,7 @@ func (k *Kubeclient) Get(
if name == "" {
return nil,
errors.New(
"failed to get csi volume: missing csi volume name",
"failed to get zfs volume: missing zfs volume name",
)
}
@ -334,7 +334,7 @@ func (k *Kubeclient) Get(
if err != nil {
return nil, errors.Wrapf(
err,
"failed to get csi volume {%s} in namespace {%s}",
"failed to get zfs volume {%s} in namespace {%s}",
name,
k.namespace,
)
@ -343,7 +343,7 @@ func (k *Kubeclient) Get(
return k.get(cli, name, k.namespace, opts)
}
// GetRaw returns csi volume instance
// GetRaw returns zfs volume instance
// in bytes
func (k *Kubeclient) GetRaw(
name string,
@ -351,14 +351,14 @@ func (k *Kubeclient) GetRaw(
) ([]byte, error) {
if name == "" {
return nil, errors.New(
"failed to get raw csi volume: missing vol name",
"failed to get raw zfs volume: missing vol name",
)
}
csiv, err := k.Get(name, opts)
if err != nil {
return nil, errors.Wrapf(
err,
"failed to get csi volume {%s} in namespace {%s}",
"failed to get zfs volume {%s} in namespace {%s}",
name,
k.namespace,
)
@ -367,14 +367,14 @@ func (k *Kubeclient) GetRaw(
return json.Marshal(csiv)
}
// List returns a list of csi volume
// List returns a list of zfs volume
// instances present in kubernetes cluster
func (k *Kubeclient) List(opts metav1.ListOptions) (*apis.ZFSVolumeList, error) {
cli, err := k.getClientOrCached()
if err != nil {
return nil, errors.Wrapf(
err,
"failed to list csi volumes in namespace {%s}",
"failed to list zfs volumes in namespace {%s}",
k.namespace,
)
}
@ -382,7 +382,7 @@ func (k *Kubeclient) List(opts metav1.ListOptions) (*apis.ZFSVolumeList, error)
return k.list(cli, k.namespace, opts)
}
// Delete deletes the csi volume from
// Delete deletes the zfs volume from
// kubernetes
func (k *Kubeclient) Delete(name string) error {
if name == "" {
@ -403,7 +403,7 @@ func (k *Kubeclient) Delete(name string) error {
return k.del(cli, name, k.namespace, &metav1.DeleteOptions{})
}
// Update updates this csi volume instance
// Update updates this zfs volume instance
// against kubernetes cluster
func (k *Kubeclient) Update(vol *apis.ZFSVolume) (*apis.ZFSVolume, error) {
if vol == nil {

View file

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package builder
package volbuilder
import (
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
@ -21,11 +21,12 @@ import (
// ZFSVolume is a wrapper over
// ZFSVolume API instance
type ZFSVolume struct {
// ZFSVolume object
Object *apis.ZFSVolume
}
// From returns a new instance of
// csi volume
// zfs volume
func From(vol *apis.ZFSVolume) *ZFSVolume {
return &ZFSVolume{
Object: vol,
@ -41,23 +42,24 @@ type Predicate func(*ZFSVolume) bool
type predicateList []Predicate
// ZFSVolumeList holds the list
// of csi volume instances
// of zfs volume instances
type ZFSVolumeList struct {
// List conatils list of volumes
List apis.ZFSVolumeList
}
// Len returns the number of items present
// in the ZFSVolumeList
func (p *ZFSVolumeList) Len() int {
return len(p.List.Items)
func (volList *ZFSVolumeList) Len() int {
return len(volList.List.Items)
}
// all returns true if all the predicates
// succeed against the provided ZFSVolume
// instance
func (l predicateList) all(p *ZFSVolume) bool {
func (l predicateList) all(vol *ZFSVolume) bool {
for _, pred := range l {
if !pred(p) {
if !pred(vol) {
return false
}
}
@ -67,9 +69,9 @@ func (l predicateList) all(p *ZFSVolume) bool {
// HasLabels returns true if provided labels
// are present in the provided ZFSVolume instance
func HasLabels(keyValuePair map[string]string) Predicate {
return func(p *ZFSVolume) bool {
return func(vol *ZFSVolume) bool {
for key, value := range keyValuePair {
if !p.HasLabel(key, value) {
if !vol.HasLabel(key, value) {
return false
}
}
@ -79,8 +81,8 @@ func HasLabels(keyValuePair map[string]string) Predicate {
// HasLabel returns true if provided label
// is present in the provided ZFSVolume instance
func (p *ZFSVolume) HasLabel(key, value string) bool {
val, ok := p.Object.GetLabels()[key]
func (vol *ZFSVolume) HasLabel(key, value string) bool {
val, ok := vol.Object.GetLabels()[key]
if ok {
return val == value
}
@ -90,26 +92,26 @@ func (p *ZFSVolume) HasLabel(key, value string) bool {
// HasLabel returns true if provided label
// is present in the provided ZFSVolume instance
func HasLabel(key, value string) Predicate {
return func(p *ZFSVolume) bool {
return p.HasLabel(key, value)
return func(vol *ZFSVolume) bool {
return vol.HasLabel(key, value)
}
}
// IsNil returns true if the csi volume instance
// IsNil returns true if the zfs volume instance
// is nil
func (p *ZFSVolume) IsNil() bool {
return p.Object == nil
func (vol *ZFSVolume) IsNil() bool {
return vol.Object == nil
}
// IsNil is predicate to filter out nil csi volume
// IsNil is predicate to filter out nil zfs volume
// instances
func IsNil() Predicate {
return func(p *ZFSVolume) bool {
return p.IsNil()
return func(vol *ZFSVolume) bool {
return vol.IsNil()
}
}
// GetAPIObject returns csi volume's API instance
func (p *ZFSVolume) GetAPIObject() *apis.ZFSVolume {
return p.Object
// GetAPIObject returns zfs volume's API instance
func (vol *ZFSVolume) GetAPIObject() *apis.ZFSVolume {
return vol.Object
}

View file

@ -20,14 +20,16 @@ import (
"github.com/Sirupsen/logrus"
"github.com/container-storage-interface/spec/lib/go/csi"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/builder"
"github.com/openebs/zfs-localpv/pkg/mgmt"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"github.com/openebs/zfs-localpv/pkg/mgmt/snapshot"
"github.com/openebs/zfs-localpv/pkg/mgmt/volume"
"github.com/openebs/zfs-localpv/pkg/zfs"
"golang.org/x/net/context"
"golang.org/x/sys/unix"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
"sync"
)
@ -41,14 +43,26 @@ type node struct {
// of CSI NodeServer
func NewNode(d *CSIDriver) csi.NodeServer {
var ControllerMutex = sync.RWMutex{}
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
// start the zfsvolume watcher
go func() {
err := mgmt.Start(&ControllerMutex)
err := volume.Start(&ControllerMutex, stopCh)
if err != nil {
logrus.Fatalf("Failed to start ZFS volume management controller: %s", err.Error())
}
}()
// start the snapshot watcher
go func() {
err := snapshot.Start(&ControllerMutex, stopCh)
if err != nil {
logrus.Fatalf("Failed to start ZFS volume snapshot management controller: %s", err.Error())
}
}()
return &node{
driver: d,
}
@ -65,7 +79,7 @@ func GetVolAndMountInfo(
mountinfo.MountOptions = append(mountinfo.MountOptions, req.GetVolumeCapability().GetMount().GetMountFlags()...)
getOptions := metav1.GetOptions{}
vol, err := builder.NewKubeclient().
vol, err := volbuilder.NewKubeclient().
WithNamespace(zfs.OpenEBSNamespace).
Get(req.GetVolumeId(), getOptions)

View file

@ -19,10 +19,13 @@ package driver
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/openebs/zfs-localpv/pkg/builder"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
errors "github.com/openebs/zfs-localpv/pkg/common/errors"
csipayload "github.com/openebs/zfs-localpv/pkg/response"
zfs "github.com/openebs/zfs-localpv/pkg/zfs"
@ -55,18 +58,7 @@ var SupportedVolumeCapabilityAccessModes = []*csi.VolumeCapability_AccessMode{
},
}
// CreateVolume provisions a volume
func (cs *controller) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest,
) (*csi.CreateVolumeResponse, error) {
var err error
if err = cs.validateVolumeCreateReq(req); err != nil {
return nil, err
}
func CreateZFSVolume(req *csi.CreateVolumeRequest) (string, error) {
volName := req.GetName()
size := req.GetCapacityRange().RequiredBytes
rs := req.GetParameters()["recordsize"]
@ -86,12 +78,12 @@ func (cs *controller) CreateVolume(
selected := scheduler(req.AccessibilityRequirements, schld, pool)
if len(selected) == 0 {
return nil, status.Error(codes.Internal, "scheduler failed")
return "", status.Error(codes.Internal, "scheduler failed")
}
logrus.Infof("scheduled the volume %s/%s on node %s", pool, volName, selected)
volObj, err := builder.NewBuilder().
volObj, err := volbuilder.NewBuilder().
WithName(volName).
WithCapacity(strconv.FormatInt(int64(size), 10)).
WithRecordSize(rs).
@ -108,12 +100,94 @@ func (cs *controller) CreateVolume(
WithCompression(compression).Build()
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
return "", status.Error(codes.Internal, err.Error())
}
err = zfs.ProvisionVolume(size, volObj)
err = zfs.ProvisionVolume(volObj)
if err != nil {
return nil, status.Error(codes.Internal, "not able to provision the volume")
return "", status.Error(codes.Internal,
"not able to provision the volume")
}
return selected, nil
}
func CreateZFSClone(req *csi.CreateVolumeRequest, snapshot string) (string, error) {
volName := req.GetName()
pool := req.GetParameters()["poolname"]
size := req.GetCapacityRange().RequiredBytes
volsize := strconv.FormatInt(int64(size), 10)
snapshotID := strings.Split(snapshot, "@")
if len(snapshotID) != 2 {
return "", status.Errorf(
codes.Internal,
"snap name is not valid %s, {%s}",
snapshot,
"invalid snapshot name",
)
}
snap, err := zfs.GetZFSSnapshot(snapshotID[1])
if err != nil {
return "", status.Error(codes.Internal, err.Error())
}
if snap.Spec.PoolName != pool {
return "", status.Errorf(codes.Internal,
"clone to a different pool src pool %s dst pool %s",
snap.Spec.PoolName, pool)
}
if snap.Spec.Capacity != volsize {
return "", status.Error(codes.Internal, "resize not supported")
}
selected := snap.Spec.OwnerNodeID
volObj, err := volbuilder.NewBuilder().
WithName(volName).Build()
volObj.Spec = snap.Spec
volObj.Spec.SnapName = snapshot
err = zfs.ProvisionVolume(volObj)
if err != nil {
return "", status.Error(codes.Internal,
"not able to provision the volume")
}
return selected, nil
}
// CreateVolume provisions a volume
func (cs *controller) CreateVolume(
ctx context.Context,
req *csi.CreateVolumeRequest,
) (*csi.CreateVolumeResponse, error) {
var err error
var selected string
volName := req.GetName()
size := req.GetCapacityRange().RequiredBytes
if err = cs.validateVolumeCreateReq(req); err != nil {
return nil, err
}
contentSource := req.GetVolumeContentSource()
if contentSource != nil && contentSource.GetSnapshot() != nil {
snapshotID := contentSource.GetSnapshot().GetSnapshotId()
selected, err = CreateZFSClone(req, snapshotID)
} else {
selected, err = CreateZFSVolume(req)
}
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
topology := map[string]string{zfs.ZFSTopologyKey: selected}
@ -122,6 +196,7 @@ func (cs *controller) CreateVolume(
WithName(volName).
WithCapacity(size).
WithTopology(topology).
WithContentSource(contentSource).
Build(), nil
}
@ -208,7 +283,65 @@ func (cs *controller) CreateSnapshot(
req *csi.CreateSnapshotRequest,
) (*csi.CreateSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
logrus.Infof("CreateSnapshot volume %s@%s", req.SourceVolumeId, req.Name)
snapTimeStamp := time.Now().Unix()
state, err := zfs.GetZFSSnapshotStatus(req.Name)
if err == nil {
return csipayload.NewCreateSnapshotResponseBuilder().
WithSourceVolumeID(req.SourceVolumeId).
WithSnapshotID(req.SourceVolumeId+"@"+req.Name).
WithCreationTime(snapTimeStamp, 0).
WithReadyToUse(state == zfs.ZFSStatusReady).
Build(), nil
}
vol, err := zfs.GetZFSVolume(req.SourceVolumeId)
if err != nil {
return nil, status.Errorf(
codes.Internal,
"CreateSnapshot not able to get volume %s: %s, {%s}",
req.SourceVolumeId, req.Name,
err.Error(),
)
}
labels := map[string]string{zfs.ZFSVolKey: vol.Name}
snapObj, err := snapbuilder.NewBuilder().
WithName(req.Name).
WithLabels(labels).Build()
if err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to create snapshotobject for %s: %s, {%s}",
req.SourceVolumeId, req.Name,
err.Error(),
)
}
snapObj.Spec = vol.Spec
snapObj.Status.State = zfs.ZFSStatusPending
if err := zfs.ProvisionSnapshot(snapObj); err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to handle CreateSnapshotRequest for %s: %s, {%s}",
req.SourceVolumeId, req.Name,
err.Error(),
)
}
state, _ = zfs.GetZFSSnapshotStatus(req.Name)
return csipayload.NewCreateSnapshotResponseBuilder().
WithSourceVolumeID(req.SourceVolumeId).
WithSnapshotID(req.SourceVolumeId+"@"+req.Name).
WithCreationTime(snapTimeStamp, 0).
WithReadyToUse(state == zfs.ZFSStatusReady).
Build(), nil
}
// DeleteSnapshot deletes given snapshot
@ -219,7 +352,28 @@ func (cs *controller) DeleteSnapshot(
req *csi.DeleteSnapshotRequest,
) (*csi.DeleteSnapshotResponse, error) {
return nil, status.Error(codes.Unimplemented, "")
logrus.Infof("DeleteSnapshot request for %s", req.SnapshotId)
// snapshodID is formed as <volname>@<snapname>
// parsing them here
snapshotID := strings.Split(req.SnapshotId, "@")
if len(snapshotID) != 2 {
return nil, status.Errorf(
codes.Internal,
"failed to handle DeleteSnapshot for %s, {%s}",
req.SnapshotId,
"failed to get the snapshot name, Manual intervention required",
)
}
if err := zfs.DeleteSnapshot(snapshotID[1]); err != nil {
return nil, status.Errorf(
codes.Internal,
"failed to handle DeleteSnapshot for %s, {%s}",
req.SnapshotId,
err.Error(),
)
}
return &csi.DeleteSnapshotResponse{}, nil
}
// ListSnapshots lists all snapshots for the
@ -346,6 +500,8 @@ func newControllerCapabilities() []*csi.ControllerServiceCapability {
var capabilities []*csi.ControllerServiceCapability
for _, cap := range []csi.ControllerServiceCapability_RPC_Type{
csi.ControllerServiceCapability_RPC_CREATE_DELETE_VOLUME,
csi.ControllerServiceCapability_RPC_CREATE_DELETE_SNAPSHOT,
csi.ControllerServiceCapability_RPC_CLONE_VOLUME,
} {
capabilities = append(capabilities, fromType(cap))
}

View file

@ -21,7 +21,7 @@ import (
"math"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/openebs/zfs-localpv/pkg/builder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
zfs "github.com/openebs/zfs-localpv/pkg/zfs"
@ -40,7 +40,7 @@ const (
func volumeWeightedScheduler(topo *csi.TopologyRequirement, pool string) string {
var selected string
zvlist, err := builder.NewKubeclient().
zvlist, err := volbuilder.NewKubeclient().
WithNamespace(zfs.OpenEBSNamespace).
List(metav1.ListOptions{})

View file

@ -27,6 +27,7 @@ import (
type OpenebsV1alpha1Interface interface {
RESTClient() rest.Interface
ZFSSnapshotsGetter
ZFSVolumesGetter
}
@ -35,6 +36,10 @@ type OpenebsV1alpha1Client struct {
restClient rest.Interface
}
func (c *OpenebsV1alpha1Client) ZFSSnapshots(namespace string) ZFSSnapshotInterface {
return newZFSSnapshots(c, namespace)
}
func (c *OpenebsV1alpha1Client) ZFSVolumes(namespace string) ZFSVolumeInterface {
return newZFSVolumes(c, namespace)
}

View file

@ -28,6 +28,10 @@ type FakeOpenebsV1alpha1 struct {
*testing.Fake
}
func (c *FakeOpenebsV1alpha1) ZFSSnapshots(namespace string) v1alpha1.ZFSSnapshotInterface {
return &FakeZFSSnapshots{c, namespace}
}
func (c *FakeOpenebsV1alpha1) ZFSVolumes(namespace string) v1alpha1.ZFSVolumeInterface {
return &FakeZFSVolumes{c, namespace}
}

View file

@ -0,0 +1,140 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package fake
import (
v1alpha1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
labels "k8s.io/apimachinery/pkg/labels"
schema "k8s.io/apimachinery/pkg/runtime/schema"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
testing "k8s.io/client-go/testing"
)
// FakeZFSSnapshots implements ZFSSnapshotInterface
type FakeZFSSnapshots struct {
Fake *FakeOpenebsV1alpha1
ns string
}
var zfssnapshotsResource = schema.GroupVersionResource{Group: "openebs.io", Version: "v1alpha1", Resource: "zfssnapshots"}
var zfssnapshotsKind = schema.GroupVersionKind{Group: "openebs.io", Version: "v1alpha1", Kind: "ZFSSnapshot"}
// Get takes name of the zFSSnapshot, and returns the corresponding zFSSnapshot object, and an error if there is any.
func (c *FakeZFSSnapshots) Get(name string, options v1.GetOptions) (result *v1alpha1.ZFSSnapshot, err error) {
obj, err := c.Fake.
Invokes(testing.NewGetAction(zfssnapshotsResource, c.ns, name), &v1alpha1.ZFSSnapshot{})
if obj == nil {
return nil, err
}
return obj.(*v1alpha1.ZFSSnapshot), err
}
// List takes label and field selectors, and returns the list of ZFSSnapshots that match those selectors.
func (c *FakeZFSSnapshots) List(opts v1.ListOptions) (result *v1alpha1.ZFSSnapshotList, err error) {
obj, err := c.Fake.
Invokes(testing.NewListAction(zfssnapshotsResource, zfssnapshotsKind, c.ns, opts), &v1alpha1.ZFSSnapshotList{})
if obj == nil {
return nil, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
list := &v1alpha1.ZFSSnapshotList{ListMeta: obj.(*v1alpha1.ZFSSnapshotList).ListMeta}
for _, item := range obj.(*v1alpha1.ZFSSnapshotList).Items {
if label.Matches(labels.Set(item.Labels)) {
list.Items = append(list.Items, item)
}
}
return list, err
}
// Watch returns a watch.Interface that watches the requested zFSSnapshots.
func (c *FakeZFSSnapshots) Watch(opts v1.ListOptions) (watch.Interface, error) {
return c.Fake.
InvokesWatch(testing.NewWatchAction(zfssnapshotsResource, c.ns, opts))
}
// Create takes the representation of a zFSSnapshot and creates it. Returns the server's representation of the zFSSnapshot, and an error, if there is any.
func (c *FakeZFSSnapshots) Create(zFSSnapshot *v1alpha1.ZFSSnapshot) (result *v1alpha1.ZFSSnapshot, err error) {
obj, err := c.Fake.
Invokes(testing.NewCreateAction(zfssnapshotsResource, c.ns, zFSSnapshot), &v1alpha1.ZFSSnapshot{})
if obj == nil {
return nil, err
}
return obj.(*v1alpha1.ZFSSnapshot), err
}
// Update takes the representation of a zFSSnapshot and updates it. Returns the server's representation of the zFSSnapshot, and an error, if there is any.
func (c *FakeZFSSnapshots) Update(zFSSnapshot *v1alpha1.ZFSSnapshot) (result *v1alpha1.ZFSSnapshot, err error) {
obj, err := c.Fake.
Invokes(testing.NewUpdateAction(zfssnapshotsResource, c.ns, zFSSnapshot), &v1alpha1.ZFSSnapshot{})
if obj == nil {
return nil, err
}
return obj.(*v1alpha1.ZFSSnapshot), err
}
// UpdateStatus was generated because the type contains a Status member.
// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
func (c *FakeZFSSnapshots) UpdateStatus(zFSSnapshot *v1alpha1.ZFSSnapshot) (*v1alpha1.ZFSSnapshot, error) {
obj, err := c.Fake.
Invokes(testing.NewUpdateSubresourceAction(zfssnapshotsResource, "status", c.ns, zFSSnapshot), &v1alpha1.ZFSSnapshot{})
if obj == nil {
return nil, err
}
return obj.(*v1alpha1.ZFSSnapshot), err
}
// Delete takes name of the zFSSnapshot and deletes it. Returns an error if one occurs.
func (c *FakeZFSSnapshots) Delete(name string, options *v1.DeleteOptions) error {
_, err := c.Fake.
Invokes(testing.NewDeleteAction(zfssnapshotsResource, c.ns, name), &v1alpha1.ZFSSnapshot{})
return err
}
// DeleteCollection deletes a collection of objects.
func (c *FakeZFSSnapshots) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
action := testing.NewDeleteCollectionAction(zfssnapshotsResource, c.ns, listOptions)
_, err := c.Fake.Invokes(action, &v1alpha1.ZFSSnapshotList{})
return err
}
// Patch applies the patch and returns the patched zFSSnapshot.
func (c *FakeZFSSnapshots) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ZFSSnapshot, err error) {
obj, err := c.Fake.
Invokes(testing.NewPatchSubresourceAction(zfssnapshotsResource, c.ns, name, pt, data, subresources...), &v1alpha1.ZFSSnapshot{})
if obj == nil {
return nil, err
}
return obj.(*v1alpha1.ZFSSnapshot), err
}

View file

@ -18,4 +18,6 @@ limitations under the License.
package v1alpha1
type ZFSSnapshotExpansion interface{}
type ZFSVolumeExpansion interface{}

View file

@ -0,0 +1,191 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by client-gen. DO NOT EDIT.
package v1alpha1
import (
"time"
v1alpha1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
scheme "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset/scheme"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
watch "k8s.io/apimachinery/pkg/watch"
rest "k8s.io/client-go/rest"
)
// ZFSSnapshotsGetter has a method to return a ZFSSnapshotInterface.
// A group's client should implement this interface.
type ZFSSnapshotsGetter interface {
ZFSSnapshots(namespace string) ZFSSnapshotInterface
}
// ZFSSnapshotInterface has methods to work with ZFSSnapshot resources.
type ZFSSnapshotInterface interface {
Create(*v1alpha1.ZFSSnapshot) (*v1alpha1.ZFSSnapshot, error)
Update(*v1alpha1.ZFSSnapshot) (*v1alpha1.ZFSSnapshot, error)
UpdateStatus(*v1alpha1.ZFSSnapshot) (*v1alpha1.ZFSSnapshot, error)
Delete(name string, options *v1.DeleteOptions) error
DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error
Get(name string, options v1.GetOptions) (*v1alpha1.ZFSSnapshot, error)
List(opts v1.ListOptions) (*v1alpha1.ZFSSnapshotList, error)
Watch(opts v1.ListOptions) (watch.Interface, error)
Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ZFSSnapshot, err error)
ZFSSnapshotExpansion
}
// zFSSnapshots implements ZFSSnapshotInterface
type zFSSnapshots struct {
client rest.Interface
ns string
}
// newZFSSnapshots returns a ZFSSnapshots
func newZFSSnapshots(c *OpenebsV1alpha1Client, namespace string) *zFSSnapshots {
return &zFSSnapshots{
client: c.RESTClient(),
ns: namespace,
}
}
// Get takes name of the zFSSnapshot, and returns the corresponding zFSSnapshot object, and an error if there is any.
func (c *zFSSnapshots) Get(name string, options v1.GetOptions) (result *v1alpha1.ZFSSnapshot, err error) {
result = &v1alpha1.ZFSSnapshot{}
err = c.client.Get().
Namespace(c.ns).
Resource("zfssnapshots").
Name(name).
VersionedParams(&options, scheme.ParameterCodec).
Do().
Into(result)
return
}
// List takes label and field selectors, and returns the list of ZFSSnapshots that match those selectors.
func (c *zFSSnapshots) List(opts v1.ListOptions) (result *v1alpha1.ZFSSnapshotList, err error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
result = &v1alpha1.ZFSSnapshotList{}
err = c.client.Get().
Namespace(c.ns).
Resource("zfssnapshots").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Do().
Into(result)
return
}
// Watch returns a watch.Interface that watches the requested zFSSnapshots.
func (c *zFSSnapshots) Watch(opts v1.ListOptions) (watch.Interface, error) {
var timeout time.Duration
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
opts.Watch = true
return c.client.Get().
Namespace(c.ns).
Resource("zfssnapshots").
VersionedParams(&opts, scheme.ParameterCodec).
Timeout(timeout).
Watch()
}
// Create takes the representation of a zFSSnapshot and creates it. Returns the server's representation of the zFSSnapshot, and an error, if there is any.
func (c *zFSSnapshots) Create(zFSSnapshot *v1alpha1.ZFSSnapshot) (result *v1alpha1.ZFSSnapshot, err error) {
result = &v1alpha1.ZFSSnapshot{}
err = c.client.Post().
Namespace(c.ns).
Resource("zfssnapshots").
Body(zFSSnapshot).
Do().
Into(result)
return
}
// Update takes the representation of a zFSSnapshot and updates it. Returns the server's representation of the zFSSnapshot, and an error, if there is any.
func (c *zFSSnapshots) Update(zFSSnapshot *v1alpha1.ZFSSnapshot) (result *v1alpha1.ZFSSnapshot, err error) {
result = &v1alpha1.ZFSSnapshot{}
err = c.client.Put().
Namespace(c.ns).
Resource("zfssnapshots").
Name(zFSSnapshot.Name).
Body(zFSSnapshot).
Do().
Into(result)
return
}
// UpdateStatus was generated because the type contains a Status member.
// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus().
func (c *zFSSnapshots) UpdateStatus(zFSSnapshot *v1alpha1.ZFSSnapshot) (result *v1alpha1.ZFSSnapshot, err error) {
result = &v1alpha1.ZFSSnapshot{}
err = c.client.Put().
Namespace(c.ns).
Resource("zfssnapshots").
Name(zFSSnapshot.Name).
SubResource("status").
Body(zFSSnapshot).
Do().
Into(result)
return
}
// Delete takes name of the zFSSnapshot and deletes it. Returns an error if one occurs.
func (c *zFSSnapshots) Delete(name string, options *v1.DeleteOptions) error {
return c.client.Delete().
Namespace(c.ns).
Resource("zfssnapshots").
Name(name).
Body(options).
Do().
Error()
}
// DeleteCollection deletes a collection of objects.
func (c *zFSSnapshots) DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error {
var timeout time.Duration
if listOptions.TimeoutSeconds != nil {
timeout = time.Duration(*listOptions.TimeoutSeconds) * time.Second
}
return c.client.Delete().
Namespace(c.ns).
Resource("zfssnapshots").
VersionedParams(&listOptions, scheme.ParameterCodec).
Timeout(timeout).
Body(options).
Do().
Error()
}
// Patch applies the patch and returns the patched zFSSnapshot.
func (c *zFSSnapshots) Patch(name string, pt types.PatchType, data []byte, subresources ...string) (result *v1alpha1.ZFSSnapshot, err error) {
result = &v1alpha1.ZFSSnapshot{}
err = c.client.Patch(pt).
Namespace(c.ns).
Resource("zfssnapshots").
SubResource(subresources...).
Name(name).
Body(data).
Do().
Into(result)
return
}

View file

@ -24,6 +24,8 @@ import (
// Interface provides access to all the informers in this group version.
type Interface interface {
// ZFSSnapshots returns a ZFSSnapshotInformer.
ZFSSnapshots() ZFSSnapshotInformer
// ZFSVolumes returns a ZFSVolumeInformer.
ZFSVolumes() ZFSVolumeInformer
}
@ -39,6 +41,11 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList
return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions}
}
// ZFSSnapshots returns a ZFSSnapshotInformer.
func (v *version) ZFSSnapshots() ZFSSnapshotInformer {
return &zFSSnapshotInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}
}
// ZFSVolumes returns a ZFSVolumeInformer.
func (v *version) ZFSVolumes() ZFSVolumeInformer {
return &zFSVolumeInformer{factory: v.factory, namespace: v.namespace, tweakListOptions: v.tweakListOptions}

View file

@ -0,0 +1,89 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by informer-gen. DO NOT EDIT.
package v1alpha1
import (
time "time"
corev1alpha1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
internalclientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset"
internalinterfaces "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions/internalinterfaces"
v1alpha1 "github.com/openebs/zfs-localpv/pkg/generated/lister/core/v1alpha1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
runtime "k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
cache "k8s.io/client-go/tools/cache"
)
// ZFSSnapshotInformer provides access to a shared informer and lister for
// ZFSSnapshots.
type ZFSSnapshotInformer interface {
Informer() cache.SharedIndexInformer
Lister() v1alpha1.ZFSSnapshotLister
}
type zFSSnapshotInformer struct {
factory internalinterfaces.SharedInformerFactory
tweakListOptions internalinterfaces.TweakListOptionsFunc
namespace string
}
// NewZFSSnapshotInformer constructs a new informer for ZFSSnapshot type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewZFSSnapshotInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
return NewFilteredZFSSnapshotInformer(client, namespace, resyncPeriod, indexers, nil)
}
// NewFilteredZFSSnapshotInformer constructs a new informer for ZFSSnapshot type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredZFSSnapshotInformer(client internalclientset.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
return cache.NewSharedIndexInformer(
&cache.ListWatch{
ListFunc: func(options v1.ListOptions) (runtime.Object, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.OpenebsV1alpha1().ZFSSnapshots(namespace).List(options)
},
WatchFunc: func(options v1.ListOptions) (watch.Interface, error) {
if tweakListOptions != nil {
tweakListOptions(&options)
}
return client.OpenebsV1alpha1().ZFSSnapshots(namespace).Watch(options)
},
},
&corev1alpha1.ZFSSnapshot{},
resyncPeriod,
indexers,
)
}
func (f *zFSSnapshotInformer) defaultInformer(client internalclientset.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return NewFilteredZFSSnapshotInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}
func (f *zFSSnapshotInformer) Informer() cache.SharedIndexInformer {
return f.factory.InformerFor(&corev1alpha1.ZFSSnapshot{}, f.defaultInformer)
}
func (f *zFSSnapshotInformer) Lister() v1alpha1.ZFSSnapshotLister {
return v1alpha1.NewZFSSnapshotLister(f.Informer().GetIndexer())
}

View file

@ -53,6 +53,8 @@ func (f *genericInformer) Lister() cache.GenericLister {
func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) {
switch resource {
// Group=openebs.io, Version=v1alpha1
case v1alpha1.SchemeGroupVersion.WithResource("zfssnapshots"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Openebs().V1alpha1().ZFSSnapshots().Informer()}, nil
case v1alpha1.SchemeGroupVersion.WithResource("zfsvolumes"):
return &genericInformer{resource: resource.GroupResource(), informer: f.Openebs().V1alpha1().ZFSVolumes().Informer()}, nil

View file

@ -18,6 +18,14 @@ limitations under the License.
package v1alpha1
// ZFSSnapshotListerExpansion allows custom methods to be added to
// ZFSSnapshotLister.
type ZFSSnapshotListerExpansion interface{}
// ZFSSnapshotNamespaceListerExpansion allows custom methods to be added to
// ZFSSnapshotNamespaceLister.
type ZFSSnapshotNamespaceListerExpansion interface{}
// ZFSVolumeListerExpansion allows custom methods to be added to
// ZFSVolumeLister.
type ZFSVolumeListerExpansion interface{}

View file

@ -0,0 +1,94 @@
/*
Copyright 2019 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by lister-gen. DO NOT EDIT.
package v1alpha1
import (
v1alpha1 "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/tools/cache"
)
// ZFSSnapshotLister helps list ZFSSnapshots.
type ZFSSnapshotLister interface {
// List lists all ZFSSnapshots in the indexer.
List(selector labels.Selector) (ret []*v1alpha1.ZFSSnapshot, err error)
// ZFSSnapshots returns an object that can list and get ZFSSnapshots.
ZFSSnapshots(namespace string) ZFSSnapshotNamespaceLister
ZFSSnapshotListerExpansion
}
// zFSSnapshotLister implements the ZFSSnapshotLister interface.
type zFSSnapshotLister struct {
indexer cache.Indexer
}
// NewZFSSnapshotLister returns a new ZFSSnapshotLister.
func NewZFSSnapshotLister(indexer cache.Indexer) ZFSSnapshotLister {
return &zFSSnapshotLister{indexer: indexer}
}
// List lists all ZFSSnapshots in the indexer.
func (s *zFSSnapshotLister) List(selector labels.Selector) (ret []*v1alpha1.ZFSSnapshot, err error) {
err = cache.ListAll(s.indexer, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.ZFSSnapshot))
})
return ret, err
}
// ZFSSnapshots returns an object that can list and get ZFSSnapshots.
func (s *zFSSnapshotLister) ZFSSnapshots(namespace string) ZFSSnapshotNamespaceLister {
return zFSSnapshotNamespaceLister{indexer: s.indexer, namespace: namespace}
}
// ZFSSnapshotNamespaceLister helps list and get ZFSSnapshots.
type ZFSSnapshotNamespaceLister interface {
// List lists all ZFSSnapshots in the indexer for a given namespace.
List(selector labels.Selector) (ret []*v1alpha1.ZFSSnapshot, err error)
// Get retrieves the ZFSSnapshot from the indexer for a given namespace and name.
Get(name string) (*v1alpha1.ZFSSnapshot, error)
ZFSSnapshotNamespaceListerExpansion
}
// zFSSnapshotNamespaceLister implements the ZFSSnapshotNamespaceLister
// interface.
type zFSSnapshotNamespaceLister struct {
indexer cache.Indexer
namespace string
}
// List lists all ZFSSnapshots in the indexer for a given namespace.
func (s zFSSnapshotNamespaceLister) List(selector labels.Selector) (ret []*v1alpha1.ZFSSnapshot, err error) {
err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) {
ret = append(ret, m.(*v1alpha1.ZFSSnapshot))
})
return ret, err
}
// Get retrieves the ZFSSnapshot from the indexer for a given namespace and name.
func (s zFSSnapshotNamespaceLister) Get(name string) (*v1alpha1.ZFSSnapshot, error) {
obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name)
if err != nil {
return nil, err
}
if !exists {
return nil, errors.NewNotFound(v1alpha1.Resource("zfssnapshot"), name)
}
return obj.(*v1alpha1.ZFSSnapshot), nil
}

View file

@ -0,0 +1,136 @@
/*
Copyright 2020 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshot
import (
"github.com/Sirupsen/logrus"
clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset"
openebsScheme "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset/scheme"
informers "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions"
listers "github.com/openebs/zfs-localpv/pkg/generated/lister/core/v1alpha1"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
)
const controllerAgentName = "zfssnap-controller"
// SnapController is the controller implementation for Snap resources
type SnapController struct {
// kubeclientset is a standard kubernetes clientset
kubeclientset kubernetes.Interface
// clientset is a openebs custom resource package generated for custom API group.
clientset clientset.Interface
snapLister listers.ZFSSnapshotLister
// snapSynced is used for caches sync to get populated
snapSynced cache.InformerSynced
// workqueue is a rate limited work queue. This is used to queue work to be
// processed instead of performing it as soon as a change happens. This
// means we can ensure we only process a fixed amount of resources at a
// time, and makes it easy to ensure we are never processing the same item
// simultaneously in two different workers.
workqueue workqueue.RateLimitingInterface
// recorder is an event recorder for recording Event resources to the
// Kubernetes API.
recorder record.EventRecorder
}
// SnapControllerBuilder is the builder object for controller.
type SnapControllerBuilder struct {
SnapController *SnapController
}
// NewSnapControllerBuilder returns an empty instance of controller builder.
func NewSnapControllerBuilder() *SnapControllerBuilder {
return &SnapControllerBuilder{
SnapController: &SnapController{},
}
}
// withKubeClient fills kube client to controller object.
func (cb *SnapControllerBuilder) withKubeClient(ks kubernetes.Interface) *SnapControllerBuilder {
cb.SnapController.kubeclientset = ks
return cb
}
// withOpenEBSClient fills openebs client to controller object.
func (cb *SnapControllerBuilder) withOpenEBSClient(cs clientset.Interface) *SnapControllerBuilder {
cb.SnapController.clientset = cs
return cb
}
// withSnapLister fills snap lister to controller object.
func (cb *SnapControllerBuilder) withSnapLister(sl informers.SharedInformerFactory) *SnapControllerBuilder {
snapInformer := sl.Openebs().V1alpha1().ZFSSnapshots()
cb.SnapController.snapLister = snapInformer.Lister()
return cb
}
// withSnapSynced adds object sync information in cache to controller object.
func (cb *SnapControllerBuilder) withSnapSynced(sl informers.SharedInformerFactory) *SnapControllerBuilder {
snapInformer := sl.Openebs().V1alpha1().ZFSSnapshots()
cb.SnapController.snapSynced = snapInformer.Informer().HasSynced
return cb
}
// withWorkqueue adds workqueue to controller object.
func (cb *SnapControllerBuilder) withWorkqueueRateLimiting() *SnapControllerBuilder {
cb.SnapController.workqueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Snap")
return cb
}
// withRecorder adds recorder to controller object.
func (cb *SnapControllerBuilder) withRecorder(ks kubernetes.Interface) *SnapControllerBuilder {
logrus.Infof("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(logrus.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: ks.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
cb.SnapController.recorder = recorder
return cb
}
// withEventHandler adds event handlers controller object.
func (cb *SnapControllerBuilder) withEventHandler(cvcInformerFactory informers.SharedInformerFactory) *SnapControllerBuilder {
cvcInformer := cvcInformerFactory.Openebs().V1alpha1().ZFSSnapshots()
// Set up an event handler for when Snap resources change
cvcInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: cb.SnapController.addSnap,
UpdateFunc: cb.SnapController.updateSnap,
DeleteFunc: cb.SnapController.deleteSnap,
})
return cb
}
// Build returns a controller instance.
func (cb *SnapControllerBuilder) Build() (*SnapController, error) {
err := openebsScheme.AddToScheme(scheme.Scheme)
if err != nil {
return nil, err
}
return cb.SnapController, nil
}

View file

@ -0,0 +1,248 @@
/*
Copyright 2020 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshot
import (
"fmt"
"time"
"github.com/Sirupsen/logrus"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
zfs "github.com/openebs/zfs-localpv/pkg/zfs"
k8serror "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
)
// isDeletionCandidate checks if a zfs snapshot is a deletion candidate.
func (c *SnapController) isDeletionCandidate(snap *apis.ZFSSnapshot) bool {
return snap.ObjectMeta.DeletionTimestamp != nil
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two.
func (c *SnapController) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
// Get the snap resource with this namespace/name
snap, err := c.snapLister.ZFSSnapshots(namespace).Get(name)
if k8serror.IsNotFound(err) {
runtime.HandleError(fmt.Errorf("zfs snapshot '%s' has been deleted", key))
return nil
}
if err != nil {
return err
}
snapCopy := snap.DeepCopy()
err = c.syncSnap(snapCopy)
return err
}
// enqueueSnap takes a ZFSSnapshot resource and converts it into a namespace/name
// string which is then put onto the work queue. This method should *not* be
// passed resources of any type other than ZFSSnapshot.
func (c *SnapController) enqueueSnap(obj interface{}) {
var key string
var err error
if key, err = cache.MetaNamespaceKeyFunc(obj); err != nil {
runtime.HandleError(err)
return
}
c.workqueue.Add(key)
}
// synSnap is the function which tries to converge to a desired state for the
// ZFSSnapshot
func (c *SnapController) syncSnap(snap *apis.ZFSSnapshot) error {
var err error
// ZFSSnapshot should be deleted. Check if deletion timestamp is set
if c.isDeletionCandidate(snap) {
err = zfs.DestroySnapshot(snap)
if err == nil {
zfs.RemoveSnapFinalizer(snap)
}
} else {
// if finalizer is not set then it means we are creating
// the zfs snapshot.
if snap.Finalizers == nil {
err = zfs.CreateSnapshot(snap)
if err == nil {
err = zfs.UpdateSnapInfo(snap)
}
}
}
return err
}
// addSnap is the add event handler for ZFSSnapshot
func (c *SnapController) addSnap(obj interface{}) {
snap, ok := obj.(*apis.ZFSSnapshot)
if !ok {
runtime.HandleError(fmt.Errorf("Couldn't get snap object %#v", obj))
return
}
if zfs.NodeID != snap.Spec.OwnerNodeID {
return
}
logrus.Infof("Got add event for Snap %s/%s", snap.Spec.PoolName, snap.Name)
c.enqueueSnap(snap)
}
// updateSnap is the update event handler for ZFSSnapshot
func (c *SnapController) updateSnap(oldObj, newObj interface{}) {
newSnap, ok := newObj.(*apis.ZFSSnapshot)
if !ok {
runtime.HandleError(fmt.Errorf("Couldn't get snap object %#v", newSnap))
return
}
if zfs.NodeID != newSnap.Spec.OwnerNodeID {
return
}
// update on Snapshot CR does not make sense unless it is a deletion candidate
if c.isDeletionCandidate(newSnap) {
logrus.Infof("Got update event for Snap %s/%s@%s", newSnap.Spec.PoolName, newSnap.Labels[zfs.ZFSVolKey], newSnap.Name)
c.enqueueSnap(newSnap)
}
}
// deleteSnap is the delete event handler for ZFSSnapshot
func (c *SnapController) deleteSnap(obj interface{}) {
snap, ok := obj.(*apis.ZFSSnapshot)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
runtime.HandleError(fmt.Errorf("Couldn't get object from tombstone %#v", obj))
return
}
snap, ok = tombstone.Obj.(*apis.ZFSSnapshot)
if !ok {
runtime.HandleError(fmt.Errorf("Tombstone contained object that is not a zfssnap %#v", obj))
return
}
}
if zfs.NodeID != snap.Spec.OwnerNodeID {
return
}
logrus.Infof("Got delete event for Snap %s/%s@%s", snap.Spec.PoolName, snap.Labels[zfs.ZFSVolKey], snap.Name)
c.enqueueSnap(snap)
}
// Run will set up the event handlers for types we are interested in, as well
// as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items.
func (c *SnapController) Run(threadiness int, stopCh <-chan struct{}) error {
defer runtime.HandleCrash()
defer c.workqueue.ShutDown()
// Start the informer factories to begin populating the informer caches
logrus.Info("Starting Snap controller")
// Wait for the k8s caches to be synced before starting workers
logrus.Info("Waiting for informer caches to sync")
if ok := cache.WaitForCacheSync(stopCh, c.snapSynced); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
logrus.Info("Starting Snap workers")
// Launch worker to process Snap resources
// Threadiness will decide the number of workers you want to launch to process work items from queue
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
logrus.Info("Started Snap workers")
<-stopCh
logrus.Info("Shutting down Snap workers")
return nil
}
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (c *SnapController) runWorker() {
for c.processNextWorkItem() {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (c *SnapController) processNextWorkItem() bool {
obj, shutdown := c.workqueue.Get()
if shutdown {
return false
}
// We wrap this block in a func so we can defer c.workqueue.Done.
err := func(obj interface{}) error {
// We call Done here so the workqueue knows we have finished
// processing this item. We also must remember to call Forget if we
// do not want this work item being re-queued. For example, we do
// not call Forget if a transient error occurs, instead the item is
// put back on the workqueue and attempted again after a back-off
// period.
defer c.workqueue.Done(obj)
var key string
var ok bool
// We expect strings to come off the workqueue. These are of the
// form namespace/name. We do this as the delayed nature of the
// workqueue means the items in the informer cache may actually be
// more up to date that when the item was initially put onto the
// workqueue.
if key, ok = obj.(string); !ok {
// As the item in the workqueue is actually invalid, we call
// Forget here else we'd go into a loop of attempting to
// process a work item that is invalid.
c.workqueue.Forget(obj)
runtime.HandleError(fmt.Errorf("expected string in workqueue but got %#v", obj))
return nil
}
// Run the syncHandler, passing it the namespace/name string of the
// Snap resource to be synced.
if err := c.syncHandler(key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
c.workqueue.AddRateLimited(key)
return fmt.Errorf("error syncing '%s': %s, requeuing", key, err.Error())
}
// Finally, if no error occurs we Forget this item so it does not
// get queued again until another change happens.
c.workqueue.Forget(obj)
logrus.Infof("Successfully synced '%s'", key)
return nil
}(obj)
if err != nil {
runtime.HandleError(err)
return true
}
return true
}

107
pkg/mgmt/snapshot/start.go Normal file
View file

@ -0,0 +1,107 @@
/*
Copyright 2020 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package snapshot
import (
"sync"
"github.com/Sirupsen/logrus"
"github.com/pkg/errors"
"time"
clientset "github.com/openebs/zfs-localpv/pkg/generated/clientset/internalclientset"
informers "github.com/openebs/zfs-localpv/pkg/generated/informer/externalversions"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)
var (
masterURL string
kubeconfig string
)
// Start starts the zfssnapshot controller.
func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error {
// Get in cluster config
cfg, err := getClusterConfig(kubeconfig)
if err != nil {
return errors.Wrap(err, "error building kubeconfig")
}
// Building Kubernetes Clientset
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
return errors.Wrap(err, "error building kubernetes clientset")
}
// Building OpenEBS Clientset
openebsClient, err := clientset.NewForConfig(cfg)
if err != nil {
return errors.Wrap(err, "error building openebs clientset")
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
snapInformerFactory := informers.NewSharedInformerFactory(openebsClient, time.Second*30)
// Build() fn of all controllers calls AddToScheme to adds all types of this
// clientset into the given scheme.
// If multiple controllers happen to call this AddToScheme same time,
// it causes panic with error saying concurrent map access.
// This lock is used to serialize the AddToScheme call of all controllers.
controllerMtx.Lock()
controller, err := NewSnapControllerBuilder().
withKubeClient(kubeClient).
withOpenEBSClient(openebsClient).
withSnapSynced(snapInformerFactory).
withSnapLister(snapInformerFactory).
withRecorder(kubeClient).
withEventHandler(snapInformerFactory).
withWorkqueueRateLimiting().Build()
// blocking call, can't use defer to release the lock
controllerMtx.Unlock()
if err != nil {
return errors.Wrapf(err, "error building controller instance")
}
go kubeInformerFactory.Start(stopCh)
go snapInformerFactory.Start(stopCh)
// Threadiness defines the number of workers to be launched in Run function
return controller.Run(2, stopCh)
}
// GetClusterConfig return the config for k8s.
func getClusterConfig(kubeconfig string) (*rest.Config, error) {
cfg, err := rest.InClusterConfig()
if err != nil {
logrus.Errorf("Failed to get k8s Incluster config. %+v", err)
if kubeconfig == "" {
return nil, errors.Wrap(err, "kubeconfig is empty")
}
cfg, err = clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
return nil, errors.Wrap(err, "error building kubeconfig")
}
}
return cfg, err
}

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package mgmt
package volume
import (
"github.com/Sirupsen/logrus"

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package mgmt
package volume
import (
"sync"
@ -30,7 +30,6 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"sigs.k8s.io/controller-runtime/pkg/runtime/signals"
)
var (
@ -39,10 +38,7 @@ var (
)
// Start starts the zfsvolume controller.
func Start(controllerMtx *sync.RWMutex) error {
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
func Start(controllerMtx *sync.RWMutex, stopCh <-chan struct{}) error {
// Get in cluster config
cfg, err := getClusterConfig(kubeconfig)
if err != nil {

View file

@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package mgmt
package volume
import (
"fmt"
@ -36,8 +36,7 @@ func (c *ZVController) isDeletionCandidate(zv *apis.ZFSVolume) bool {
}
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the spcPoolUpdated resource
// with the current status of the resource.
// converge the two.
func (c *ZVController) syncHandler(key string) error {
// Convert the namespace/name string into a distinct namespace and name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
@ -90,8 +89,12 @@ func (c *ZVController) syncZV(zv *apis.ZFSVolume) error {
// created and this event is for property change only.
if zv.Finalizers != nil {
err = zfs.SetVolumeProp(zv)
} else {
if len(zv.Spec.SnapName) > 0 {
err = zfs.CreateClone(zv)
} else {
err = zfs.CreateVolume(zv)
}
if err == nil {
err = zfs.UpdateZvolInfo(zv)
}

View file

@ -57,6 +57,13 @@ func (b *CreateVolumeResponseBuilder) WithContext(ctx map[string]string) *Create
return b
}
// WithContentSource sets the contentSource against the
// CreateVolumeResponse instance
func (b *CreateVolumeResponseBuilder) WithContentSource(cnt *csi.VolumeContentSource) *CreateVolumeResponseBuilder {
b.response.Volume.ContentSource = cnt
return b
}
// WithTopology sets the topology for the
// CreateVolumeResponse instance
func (b *CreateVolumeResponseBuilder) WithTopology(topology map[string]string) *CreateVolumeResponseBuilder {

82
pkg/response/snapshot.go Normal file
View file

@ -0,0 +1,82 @@
/*
Copyright © 2020 The OpenEBS Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package v1alpha1
import (
"github.com/container-storage-interface/spec/lib/go/csi"
timestamp "github.com/golang/protobuf/ptypes/timestamp"
)
// CreateSnapshotResponseBuilder helps building an
// instance of csi CreateVolumeResponse
type CreateSnapshotResponseBuilder struct {
response *csi.CreateSnapshotResponse
}
// NewCreateSnapshotResponseBuilder returns a new
// instance of CreateSnapshotResponseBuilder
func NewCreateSnapshotResponseBuilder() *CreateSnapshotResponseBuilder {
return &CreateSnapshotResponseBuilder{
response: &csi.CreateSnapshotResponse{
Snapshot: &csi.Snapshot{},
},
}
}
// WithSize sets the size against the
// CreateSnapshotResponse instance
func (b *CreateSnapshotResponseBuilder) WithSize(size int64) *CreateSnapshotResponseBuilder {
b.response.Snapshot.SizeBytes = size
return b
}
// WithSnapshotID sets the snapshotID against the
// CreateSnapshotResponse instance
func (b *CreateSnapshotResponseBuilder) WithSnapshotID(snapshotID string) *CreateSnapshotResponseBuilder {
b.response.Snapshot.SnapshotId = snapshotID
return b
}
// WithSourceVolumeID sets the sourceVolumeID against the
// CreateSnapshotResponse instance
func (b *CreateSnapshotResponseBuilder) WithSourceVolumeID(volumeID string) *CreateSnapshotResponseBuilder {
b.response.Snapshot.SourceVolumeId = volumeID
return b
}
// WithCreationTime sets the creationTime against the
// CreateSnapshotResponse instance
func (b *CreateSnapshotResponseBuilder) WithCreationTime(tsec, tnsec int64) *CreateSnapshotResponseBuilder {
b.response.Snapshot.CreationTime = &timestamp.Timestamp{
Seconds: tsec,
Nanos: int32(tnsec),
}
return b
}
// WithReadyToUse sets the readyToUse feild against the
// CreateSnapshotResponse instance
func (b *CreateSnapshotResponseBuilder) WithReadyToUse(readyToUse bool) *CreateSnapshotResponseBuilder {
b.response.Snapshot.ReadyToUse = readyToUse
return b
}
// Build returns the constructed instance
// of csi CreateSnapshotResponse
func (b *CreateSnapshotResponseBuilder) Build() *csi.CreateSnapshotResponse {
return b.response
}

View file

@ -19,7 +19,8 @@ import (
"os"
apis "github.com/openebs/zfs-localpv/pkg/apis/openebs.io/core/v1alpha1"
"github.com/openebs/zfs-localpv/pkg/builder"
"github.com/openebs/zfs-localpv/pkg/builder/snapbuilder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -31,10 +32,16 @@ const (
OpenEBSNamespaceKey string = "OPENEBS_NAMESPACE"
// ZFSFinalizer for the ZfsVolume CR
ZFSFinalizer string = "zfs.openebs.io/finalizer"
// ZFSVolKey for the ZfsSnapshot CR to store Persistence Volume name
ZFSVolKey string = "openebs.io/persistent-volume"
// ZFSNodeKey will be used to insert Label in ZfsVolume CR
ZFSNodeKey string = "kubernetes.io/nodename"
// ZFSTopologyKey is supported topology key for the zfs driver
ZFSTopologyKey string = "kubernetes.io/hostname"
// ZFSStatusPending shows object has not handled yet
ZFSStatusPending string = "Pending"
// ZFSStatusReady shows object has been processed
ZFSStatusReady string = "Ready"
)
var (
@ -60,11 +67,10 @@ func init() {
// ProvisionVolume creates a ZFSVolume(zv) CR,
// watcher for zvc is present in CSI agent
func ProvisionVolume(
size int64,
vol *apis.ZFSVolume,
) error {
_, err := builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Create(vol)
_, err := volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Create(vol)
if err == nil {
logrus.Infof("provisioned volume %s", vol.Name)
}
@ -72,16 +78,40 @@ func ProvisionVolume(
return err
}
// ProvisionSnapshot creates a ZFSSnapshot CR,
// watcher for zvc is present in CSI agent
func ProvisionSnapshot(
snap *apis.ZFSSnapshot,
) error {
_, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Create(snap)
if err == nil {
logrus.Infof("provisioned snapshot %s", snap.Name)
}
return err
}
// DeleteSnapshot deletes the corresponding ZFSSnapshot CR
func DeleteSnapshot(snapname string) (err error) {
err = snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(snapname)
if err == nil {
logrus.Infof("deprovisioned snapshot %s", snapname)
}
return
}
// GetVolume the corresponding ZFSVolume CR
func GetVolume(volumeID string) (*apis.ZFSVolume, error) {
return builder.NewKubeclient().
return volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).
Get(volumeID, metav1.GetOptions{})
}
// DeleteVolume deletes the corresponding ZFSVol CR
func DeleteVolume(volumeID string) (err error) {
err = builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(volumeID)
err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(volumeID)
if err == nil {
logrus.Infof("deprovisioned volume %s", volumeID)
}
@ -95,15 +125,15 @@ func GetVolList(volumeID string) (*apis.ZFSVolumeList, error) {
LabelSelector: ZFSNodeKey + "=" + NodeID,
}
return builder.NewKubeclient().
return volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).List(listOptions)
}
// GetZFSVolume fetches the current Published csi Volume
// GetZFSVolume fetches the given ZFSVolume
func GetZFSVolume(volumeID string) (*apis.ZFSVolume, error) {
getOptions := metav1.GetOptions{}
vol, err := builder.NewKubeclient().
vol, err := volbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).Get(volumeID, getOptions)
return vol, err
}
@ -117,7 +147,7 @@ func UpdateZvolInfo(vol *apis.ZFSVolume) error {
return nil
}
newVol, err := builder.BuildFrom(vol).
newVol, err := volbuilder.BuildFrom(vol).
WithFinalizer(finalizers).
WithLabels(labels).Build()
@ -125,7 +155,7 @@ func UpdateZvolInfo(vol *apis.ZFSVolume) error {
return err
}
_, err = builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newVol)
_, err = volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newVol)
return err
}
@ -133,6 +163,61 @@ func UpdateZvolInfo(vol *apis.ZFSVolume) error {
func RemoveZvolFinalizer(vol *apis.ZFSVolume) error {
vol.Finalizers = nil
_, err := builder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(vol)
_, err := volbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(vol)
return err
}
// GetZFSSnapshot fetches the given ZFSSnapshot
func GetZFSSnapshot(snapID string) (*apis.ZFSSnapshot, error) {
getOptions := metav1.GetOptions{}
snap, err := snapbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).Get(snapID, getOptions)
return snap, err
}
// GetZFSSnapshotStatus returns ZFSSnapshot status
func GetZFSSnapshotStatus(snapID string) (string, error) {
getOptions := metav1.GetOptions{}
snap, err := snapbuilder.NewKubeclient().
WithNamespace(OpenEBSNamespace).Get(snapID, getOptions)
if err != nil {
logrus.Errorf("Get snapshot failed %s err: %s", snap.Name, err.Error())
return "", err
}
return snap.Status.State, nil
}
// UpdateSnapInfo updates ZFSSnapshot CR with node id and finalizer
func UpdateSnapInfo(snap *apis.ZFSSnapshot) error {
finalizers := []string{ZFSFinalizer}
labels := map[string]string{ZFSNodeKey: NodeID}
if snap.Finalizers != nil {
return nil
}
newSnap, err := snapbuilder.BuildFrom(snap).
WithFinalizer(finalizers).
WithLabels(labels).Build()
// set the status to ready
newSnap.Status.State = ZFSStatusReady
if err != nil {
logrus.Errorf("Update snapshot failed %s err: %s", snap.Name, err.Error())
return err
}
_, err = snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(newSnap)
return err
}
// RemoveSnapFinalizer adds finalizer to ZFSSnapshot CR
func RemoveSnapFinalizer(snap *apis.ZFSSnapshot) error {
snap.Finalizers = nil
_, err := snapbuilder.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(snap)
return err
}

View file

@ -34,10 +34,12 @@ const (
const (
ZFSVolCmd = "zfs"
ZFSCreateArg = "create"
ZFSCloneArg = "clone"
ZFSDestroyArg = "destroy"
ZFSSetArg = "set"
ZFSGetArg = "get"
ZFSListArg = "list"
ZFSSnapshotArg = "snapshot"
)
// constants to define volume type
@ -115,6 +117,49 @@ func buildZvolCreateArgs(vol *apis.ZFSVolume) []string {
return ZFSVolArg
}
// builldCloneCreateArgs returns zfs clone commands for zfs volume/dataset along with attributes as a string array
func buildCloneCreateArgs(vol *apis.ZFSVolume) []string {
var ZFSVolArg []string
volume := vol.Spec.PoolName + "/" + vol.Name
snapshot := vol.Spec.PoolName + "/" + vol.Spec.SnapName
ZFSVolArg = append(ZFSVolArg, ZFSCloneArg)
if vol.Spec.VolumeType == VOLTYPE_DATASET {
ZFSVolArg = append(ZFSVolArg, "-o", "mountpoint=none")
}
ZFSVolArg = append(ZFSVolArg, snapshot, volume)
return ZFSVolArg
}
// buildZFSSnapCreateArgs returns zfs create command for zfs snapshot
// zfs snapshot <poolname>/<volname>@<snapname>
func buildZFSSnapCreateArgs(snap *apis.ZFSSnapshot) []string {
var ZFSSnapArg []string
volname := snap.Labels[ZFSVolKey]
snapDataset := snap.Spec.PoolName + "/" + volname + "@" + snap.Name
ZFSSnapArg = append(ZFSSnapArg, ZFSSnapshotArg, snapDataset)
return ZFSSnapArg
}
// builldZFSSnapDestroyArgs returns zfs destroy command for zfs snapshot
// zfs destroy <poolname>/<volname>@<snapname>
func buildZFSSnapDestroyArgs(snap *apis.ZFSSnapshot) []string {
var ZFSSnapArg []string
volname := snap.Labels[ZFSVolKey]
snapDataset := snap.Spec.PoolName + "/" + volname + "@" + snap.Name
ZFSSnapArg = append(ZFSSnapArg, ZFSDestroyArg, snapDataset)
return ZFSSnapArg
}
// builldDatasetCreateArgs returns zfs create command for dataset along with attributes as a string array
func buildDatasetCreateArgs(vol *apis.ZFSVolume) []string {
var ZFSVolArg []string
@ -241,6 +286,31 @@ func CreateVolume(vol *apis.ZFSVolume) error {
return nil
}
// CreateClone creates clone for the zvol/dataset as per
// info provided in ZFSVolume object
func CreateClone(vol *apis.ZFSVolume) error {
volume := vol.Spec.PoolName + "/" + vol.Name
if err := getVolume(volume); err != nil {
var args []string
args = buildCloneCreateArgs(vol)
cmd := exec.Command(ZFSVolCmd, args...)
out, err := cmd.CombinedOutput()
if err != nil {
logrus.Errorf(
"zfs: could not clone volume %v cmd %v error: %s", volume, args, string(out),
)
return err
}
logrus.Infof("created clone %s", volume)
} else if err == nil {
logrus.Infof("using existing clone volume %v", volume)
}
return nil
}
// SetDatasetMountProp sets mountpoint for the volume
func SetDatasetMountProp(volume string, mountpath string) error {
var ZFSVolArg []string
@ -335,6 +405,9 @@ func DestroyVolume(vol *apis.ZFSVolume) error {
volume := vol.Spec.PoolName + "/" + vol.Name
if err := getVolume(volume); err != nil {
logrus.Errorf(
"destroy: volume %v is not present, error: %s", volume, err.Error(),
)
return nil
}
@ -353,6 +426,59 @@ func DestroyVolume(vol *apis.ZFSVolume) error {
return nil
}
// CreateSnapshot creates the zfs volume snapshot
func CreateSnapshot(snap *apis.ZFSSnapshot) error {
volume := snap.Labels[ZFSVolKey]
snapDataset := snap.Spec.PoolName + "/" + volume + "@" + snap.Name
if err := getVolume(snapDataset); err == nil {
logrus.Infof("snapshot already there %s", snapDataset)
// snapshot already there just return
return nil
}
args := buildZFSSnapCreateArgs(snap)
cmd := exec.Command(ZFSVolCmd, args...)
out, err := cmd.CombinedOutput()
if err != nil {
logrus.Errorf(
"zfs: could not create snapshot %v@%v cmd %v error: %s", volume, snap.Name, args, string(out),
)
return err
}
logrus.Infof("created snapshot %s@%s", volume, snap.Name)
return nil
}
// DestroySnapshot deletes the zfs volume snapshot
func DestroySnapshot(snap *apis.ZFSSnapshot) error {
volume := snap.Labels[ZFSVolKey]
snapDataset := snap.Spec.PoolName + "/" + volume + "@" + snap.Name
if err := getVolume(snapDataset); err != nil {
logrus.Errorf(
"destroy: snapshot %v is not present, error: %s", volume, err.Error(),
)
return nil
}
args := buildZFSSnapDestroyArgs(snap)
cmd := exec.Command(ZFSVolCmd, args...)
out, err := cmd.CombinedOutput()
if err != nil {
logrus.Errorf(
"zfs: could not destroy snapshot %v@%v cmd %v error: %s", volume, snap.Name, args, string(out),
)
return err
}
logrus.Infof("deleted snapshot %s@%s", volume, snap.Name)
return nil
}
// GetVolumeDevPath returns devpath for the given volume
func GetVolumeDevPath(vol *apis.ZFSVolume) (string, error) {
volume := vol.Spec.PoolName + "/" + vol.Name

View file

@ -20,7 +20,7 @@ import (
"github.com/Sirupsen/logrus"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/openebs/zfs-localpv/pkg/builder"
"github.com/openebs/zfs-localpv/pkg/builder/volbuilder"
"github.com/openebs/zfs-localpv/tests/deploy"
"github.com/openebs/zfs-localpv/tests/pod"
"github.com/openebs/zfs-localpv/tests/pvc"
@ -41,7 +41,7 @@ const (
)
var (
ZFSClient *builder.Kubeclient
ZFSClient *volbuilder.Kubeclient
SCClient *sc.Kubeclient
PVCClient *pvc.Kubeclient
DeployClient *deploy.Kubeclient
@ -74,7 +74,7 @@ func init() {
PVCClient = pvc.NewKubeClient(pvc.WithKubeConfigPath(KubeConfigPath))
DeployClient = deploy.NewKubeClient(deploy.WithKubeConfigPath(KubeConfigPath))
PodClient = pod.NewKubeClient(pod.WithKubeConfigPath(KubeConfigPath))
ZFSClient = builder.NewKubeclient(builder.WithKubeConfigPath(KubeConfigPath))
ZFSClient = volbuilder.NewKubeclient(volbuilder.WithKubeConfigPath(KubeConfigPath))
}
func TestSource(t *testing.T) {