k8s目前主要支持CPU和内存两种资源,为了支持用户需要按需分配的其他硬件类型的资源的调度分配,k8s实现了设备插件框架(device plugin framework)来用于其他硬件类型的资源集成,比如现在机器学习要使用GPU等资源,今天来看下其内部的关键实现
此时就会有两种状态:期望状态与实际状态, 因为socket存在所以服务的期望状态其实是需要注册这个插件服务,但是实际上因为某些原因,这个插件服务并没有完成注册,后续会不断的通过期望状态,调整实际状态,从而达到一致
type Watcher struct {
// 感知插件服务注册的socket的路径
path string
fs utilfs.Filesystem
// inotify监测插件服务socket变化
fsWatcher *fsnotify.Watcher
stopped chan struct{}
// 存储期望状态
desiredStateOfWorld cache.DesiredStateOfWorld
func (w *Watcher) init() error {
klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)
if err := w.fs.MkdirAll(w.path, 0755); err != nil {
return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
return nil
go func(fsWatcher *fsnotify.Watcher) {
defer close(w.stopped)
for {
select {
case event := <-fsWatcher.Events:
if event.Op&fsnotify.Create == fsnotify.Create {
err := w.handleCreateEvent(event)
if err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
case err := <-fsWatcher.Errors:
if err != nil {
klog.Errorf("fsWatcher received error: %v", err)
case <-stopCh:
// In case of plugin watcher being stopped by plugin manager, stop
// probing the creation/deletion of plugin sockets.
// Also give all pending go routines a chance to complete
select {
case <-w.stopped:
case <-time.After(11 * time.Second):
klog.Errorf("timeout on stopping watcher")
func (w *Watcher) traversePluginDir(dir string) error {
return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if path == dir {
return fmt.Errorf("error accessing path: %s error: %v", path, err)
klog.Errorf("error accessing path: %s error: %v", path, err)
return nil
switch mode := info.Mode(); {
case mode.IsDir():
if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
case mode&os.ModeSocket != 0:
event := fsnotify.Event{
Name: path,
Op: fsnotify.Create,
//TODO: Handle errors by taking corrective measures
if err := w.handleCreateEvent(event); err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
return nil
func (w *Watcher) handlePluginRegistration(socketPath string) error {
if runtime.GOOS == "windows" {
socketPath = util.NormalizePath(socketPath)
// 调用期望状态进行更新
klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath)
if err != nil {
return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
return nil
func (w *Watcher) handleDeleteEvent(event fsnotify.Event) {
klog.V(6).Infof("Handling delete event: %v", event)
socketPath := event.Name
klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
type PluginInfo struct {
SocketPath string
Timestamp time.Time
type desiredStateOfWorld struct {
socketFileToInfo map[string]PluginInfo
type actualStateOfWorld struct {
socketFileToInfo map[string]PluginInfo
registerPluginFunc := func() error {
client, conn, err := dial(socketPath, dialTimeoutDuration)
if err != nil {
return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
infoResp, err := client.GetInfo(ctx, ®isterapi.InfoRequest{})
if err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
handler, ok := pluginHandlers[infoResp.Type]
if !ok {
if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
if infoResp.Endpoint == "" {
infoResp.Endpoint = socketPath
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
err = actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath,
Timestamp: timestamp,
if err != nil {
klog.Errorf("RegisterPlugin error -- failed to add plugin at socket %s, err: %v", socketPath, err)
// 调用插件的注册回调函数
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
if err := og.notifyPlugin(client, true, ""); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", addr)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
return registerapi.NewRegistrationClient(c), c, nil
