zfs-localpv/pkg/driver/grpc.go

198 lines
5.1 KiB
Go
Raw Normal View History

/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package driver
import (
"bytes"
"fmt"
"net"
"os"
"strings"
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/kubernetes-csi/csi-lib-utils/protosanitizer"
"k8s.io/klog"
"github.com/container-storage-interface/spec/lib/go/csi"
)
// parseEndpoint should have a valid prefix(unix/tcp) to return a valid endpoint parts
func parseEndpoint(ep string) (string, string, error) {
if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") {
s := strings.SplitN(ep, "://", 2)
if s[1] != "" {
return s[0], s[1], nil
}
}
return "", "", fmt.Errorf("invalid endpoint: %v", ep)
}
//filters if the logd are informative or pollutant
func isInfotrmativeLog(info string) bool {
// add the messages that pollute logs to the array
var msgsToFilter = [][]byte{
[]byte("NodeGetVolumeStats"),
[]byte("NodeGetCapabilities"),
}
// checks for message in request
for _, msg := range msgsToFilter {
if bytes.Contains([]byte(info), msg) {
return false
}
}
return true
}
// logGRPC logs all the grpc related errors, i.e the final errors
// which are returned to the grpc clients
func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
log := isInfotrmativeLog(info.FullMethod)
if log {
klog.Infof("GRPC call: %s requests %s", info.FullMethod, protosanitizer.StripSecrets(req))
}
resp, err := handler(ctx, req)
if log {
if err != nil {
klog.Errorf("GRPC error: %v", err)
} else {
klog.Infof("GRPC response: %s", protosanitizer.StripSecrets(resp))
}
}
return resp, err
}
// NonBlockingGRPCServer defines Non blocking GRPC server interfaces
type NonBlockingGRPCServer interface {
// Start services at the endpoint
Start()
// Waits for the service to stop
Wait()
// Stops the service gracefully
Stop()
// Stops the service forcefully
ForceStop()
}
// NewNonBlockingGRPCServer returns a new instance of NonBlockingGRPCServer
func NewNonBlockingGRPCServer(ep string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) NonBlockingGRPCServer {
return &nonBlockingGRPCServer{
endpoint: ep,
idntyServer: ids,
ctrlServer: cs,
agentServer: ns}
}
// NonBlocking server
// dont block the execution for a task to complete.
// use wait group to wait for all the tasks dispatched.
type nonBlockingGRPCServer struct {
wg sync.WaitGroup
server *grpc.Server
endpoint string
idntyServer csi.IdentityServer
ctrlServer csi.ControllerServer
agentServer csi.NodeServer
}
// Start grpc server for serving CSI endpoints
func (s *nonBlockingGRPCServer) Start() {
s.wg.Add(1)
go s.serve(s.endpoint, s.idntyServer, s.ctrlServer, s.agentServer)
}
// Wait for the service to stop
func (s *nonBlockingGRPCServer) Wait() {
s.wg.Wait()
}
// Stop the service forcefully
func (s *nonBlockingGRPCServer) Stop() {
s.server.GracefulStop()
}
// ForceStop the service
func (s *nonBlockingGRPCServer) ForceStop() {
s.server.Stop()
}
// serve starts serving requests at the provided endpoint based on the type of
// plugin. In this function all the csi related interfaces are provided by
// container-storage-interface
func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) {
proto, addr, err := parseEndpoint(endpoint)
if err != nil {
klog.Fatal(err.Error())
}
// Clear off the addr if it is already present, this is done to remove stale
// entries, as this path is shared with the OS and will be the same
// everytime the plugin restarts, its possible that the last instance leaves
// a stale entry
if proto == "unix" {
addr = "/" + addr
if err := os.Remove(addr); err != nil && !os.IsNotExist(err) {
klog.Fatalf("Failed to remove %s, error: %s", addr, err.Error())
}
}
listener, err := net.Listen(proto, addr)
if err != nil {
klog.Fatalf("Failed to listen: %v", err)
}
opts := []grpc.ServerOption{
grpc.UnaryInterceptor(logGRPC),
}
// Create a new grpc server, all the request from csi client to
// create/delete/... will hit this server
server := grpc.NewServer(opts...)
s.server = server
if ids != nil {
csi.RegisterIdentityServer(server, ids)
}
if cs != nil {
csi.RegisterControllerServer(server, cs)
}
if ns != nil {
csi.RegisterNodeServer(server, ns)
}
klog.Infof("Listening for connections on address: %#v", listener.Addr())
// Start serving requests on the grpc server created
err = server.Serve(listener)
if err != nil {
klog.Fatal(err.Error())
}
}