mirror of
https://github.com/TECHNOFAB11/zfs-localpv.git
synced 2025-12-12 06:20:11 +01:00
Applications who want to share a volume can use below storageclass to make their volumes shared by multiple pods ```yaml apiVersion: storage.k8s.io/v1 kind: StorageClass metadata: name: openebs-zfspv parameters: shared: "yes" fstype: "zfs" poolname: "zfspv-pool" provisioner: zfs.csi.openebs.io ``` Now the provisioned volume using this storageclass can be used by multiple pods. Here pods have to make sure of the data consistency and have to have locking mechanism. One thing to note here is pods will be scheduled to the node where volume is present so that all the pods can use the same volume as they can access it locally only. This was we can avoid the NFS overhead and can get the optimal performance also. Also fixed the log formatting in the GRPC log. Signed-off-by: Pawan <pawan@mayadata.io>
197 lines
5 KiB
Go
197 lines
5 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package driver
|
|
|
|
import (
|
|
"bytes"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
|
|
"k8s.io/klog"
|
|
|
|
"github.com/container-storage-interface/spec/lib/go/csi"
|
|
)
|
|
|
|
// parseEndpoint should have a valid prefix(unix/tcp) to return a valid endpoint parts
|
|
func parseEndpoint(ep string) (string, string, error) {
|
|
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
|
|
s := strings.SplitN(ep, "://", 2)
|
|
if s[1] != "" {
|
|
return s[0], s[1], nil
|
|
}
|
|
}
|
|
return "", "", fmt.Errorf("Invalid endpoint: %v", ep)
|
|
}
|
|
|
|
//filters if the logd are informative or pollutant
|
|
func isInfotrmativeLog(info string) bool {
|
|
|
|
// add the messages that pollute logs to the array
|
|
var msgsToFilter = [][]byte{
|
|
[]byte("NodeGetVolumeStats"),
|
|
[]byte("NodeGetCapabilities"),
|
|
}
|
|
|
|
// checks for message in request
|
|
for _, msg := range msgsToFilter {
|
|
if bytes.Contains([]byte(info), msg) {
|
|
return false
|
|
}
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// logGRPC logs all the grpc related errors, i.e the final errors
|
|
// which are returned to the grpc clients
|
|
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
|
|
|
|
log := isInfotrmativeLog(info.FullMethod)
|
|
if log == true {
|
|
klog.Infof("GRPC call: %s requests %s", info.FullMethod, protosanitizer.StripSecrets(req))
|
|
}
|
|
|
|
resp, err := handler(ctx, req)
|
|
|
|
if log == true {
|
|
if err != nil {
|
|
klog.Errorf("GRPC error: %v", err)
|
|
} else {
|
|
klog.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
|
|
}
|
|
}
|
|
return resp, err
|
|
}
|
|
|
|
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces
|
|
type NonBlockingGRPCServer interface {
|
|
// Start services at the endpoint
|
|
Start()
|
|
|
|
// Waits for the service to stop
|
|
Wait()
|
|
|
|
// Stops the service gracefully
|
|
Stop()
|
|
|
|
// Stops the service forcefully
|
|
ForceStop()
|
|
}
|
|
|
|
// NewNonBlockingGRPCServer returns a new instance of NonBlockingGRPCServer
|
|
func NewNonBlockingGRPCServer(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) NonBlockingGRPCServer {
|
|
return &nonBlockingGRPCServer{
|
|
endpoint: ep,
|
|
idntyServer: ids,
|
|
ctrlServer: cs,
|
|
agentServer: ns}
|
|
}
|
|
|
|
// NonBlocking server
|
|
// dont block the execution for a task to complete.
|
|
// use wait group to wait for all the tasks dispatched.
|
|
type nonBlockingGRPCServer struct {
|
|
wg sync.WaitGroup
|
|
server *grpc.Server
|
|
endpoint string
|
|
idntyServer csi.IdentityServer
|
|
ctrlServer csi.ControllerServer
|
|
agentServer csi.NodeServer
|
|
}
|
|
|
|
// Start grpc server for serving CSI endpoints
|
|
func (s *nonBlockingGRPCServer) Start() {
|
|
|
|
s.wg.Add(1)
|
|
|
|
go s.serve(s.endpoint, s.idntyServer, s.ctrlServer, s.agentServer)
|
|
|
|
return
|
|
}
|
|
|
|
// Wait for the service to stop
|
|
func (s *nonBlockingGRPCServer) Wait() {
|
|
s.wg.Wait()
|
|
}
|
|
|
|
// Stop the service forcefully
|
|
func (s *nonBlockingGRPCServer) Stop() {
|
|
s.server.GracefulStop()
|
|
}
|
|
|
|
// ForceStop the service
|
|
func (s *nonBlockingGRPCServer) ForceStop() {
|
|
s.server.Stop()
|
|
}
|
|
|
|
// serve starts serving requests at the provided endpoint based on the type of
|
|
// plugin. In this function all the csi related interfaces are provided by
|
|
// container-storage-interface
|
|
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
|
|
|
|
proto, addr, err := parseEndpoint(endpoint)
|
|
if err != nil {
|
|
klog.Fatal(err.Error())
|
|
}
|
|
|
|
// Clear off the addr if it is already present, this is done to remove stale
|
|
// entries, as this path is shared with the OS and will be the same
|
|
// everytime the plugin restarts, its possible that the last instance leaves
|
|
// a stale entry
|
|
if proto == "unix" {
|
|
addr = "/" + addr
|
|
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
|
|
klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
|
|
}
|
|
}
|
|
|
|
listener, err := net.Listen(proto, addr)
|
|
if err != nil {
|
|
klog.Fatalf("Failed to listen: %v", err)
|
|
}
|
|
|
|
opts := []grpc.ServerOption{
|
|
grpc.UnaryInterceptor(logGRPC),
|
|
}
|
|
// Create a new grpc server, all the request from csi client to
|
|
// create/delete/... will hit this server
|
|
server := grpc.NewServer(opts...)
|
|
s.server = server
|
|
|
|
if ids != nil {
|
|
csi.RegisterIdentityServer(server, ids)
|
|
}
|
|
if cs != nil {
|
|
csi.RegisterControllerServer(server, cs)
|
|
}
|
|
if ns != nil {
|
|
csi.RegisterNodeServer(server, ns)
|
|
}
|
|
|
|
klog.Infof("Listening for connections on address: %#v", listener.Addr())
|
|
|
|
// Start serving requests on the grpc server created
|
|
server.Serve(listener)
|
|
|
|
}
|