docker k8s二次开发(二)api的实现
2017-10-18 10:19
896 查看
package k8sengine import ( "errors" "io" "net/http" restful "github.com/emicklei/go-restful" restclient "k8s.io/client-go/rest" client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "wdcp-build/apiserver/options" "wdcp-build/pkg/api/unversioned" "wdcp-build/pkg/db" "wdcp-build/pkg/util" "github.com/golang/glog" utilerrors "k8s.io/apimachinery/pkg/util/errors" ) type K8sCli struct { k8sParam unversioned.K8sParam k8sRunOption *options.ServerRunOptions } func CreateK8sHandler(request *restful.Request, response *restful.Response) { param := new(unversioned.K8sParam) err := request.ReadEntity(¶m) if err != nil { response.WriteError(http.StatusInternalServerError, err) } glog.Info("createK8sHandler>>>%s", param) glog.Info("createK8sHandler>>>%s", param.Protocol) glog.Info("createK8sHandler>>>%s", param.TargetPort) k8sCli := K8sCli{ k8sParam: *param, k8sRunOption: util.RunOption, } var kubeClient *client.Clientset kubeOption, err := util.GetKubeConfig(param.ClusterLevel) kubeClient = client.NewForConfigOrDie(&restclient.Config{ Host: *kubeOption.KubeProxyURL, }) if k8sCli.k8sParam.IsStateful { err = k8sCli.CreateKubeStatefulSet(kubeClient) if err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } } else { err = k8sCli.CreateKubeDeployment(kubeClient) if err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } } if err = k8sCli.CreateKubeService(kubeClient); err != nil { if k8sCli.k8sParam.IsStateful { k8sCli.DeleteKubeStatefulSet(kubeClient) } else { k8sCli.DeleteKubeDeployment(kubeClient) } apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } var mappingPort int32 glog.Info("create_k8s_handler_post>>%s", param) if mappingPort, err = GenMappingPort(param); err != nil || mappingPort == 0 { if k8sCli.k8sParam.IsStateful { k8sCli.DeleteKubeStatefulSet(kubeClient) } else { k8sCli.DeleteKubeDeployment(kubeClient) } k8sCli.DeleteKubeService(kubeClient) if err != nil { glog.Error("genMappingPort err:%s", err) apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) } if mappingPort == 0 { err = errors.New("mapping port is 0") glog.Error("genMappingPort err:%s", err) apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) } return } k8sCli.k8sParam.ServicePort = mappingPort if err = k8sCli.CreateKubeIngress(mappingPort, kubeClient); err != nil { if k8sCli.k8sParam.IsStateful { k8sCli.DeleteKubeStatefulSet(kubeClient) } else { k8sCli.DeleteKubeDeployment(kubeClient) } k8sCli.DeleteKubeService(kubeClient) apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } apiK8sResponse := util.ApiK8sResponse(unversioned.RetCodeSuccess, unversioned.RetMsgSuccess, k8sCli.k8sParam.ServicePort, k8sCli.k8sParam.IngressClusterConfig) io.WriteString(response, apiK8sResponse) } func UpdateK8sHandler(request *restful.Request, response *restful.Response) { param := new(unversioned.K8sParam) err := request.ReadEntity(¶m) if err != nil { response.WriteError(http.StatusInternalServerError, err) } k8sCli := K8sCli{ k8sParam: *param, k8sRunOption: util.RunOption, } var kubeClient *client.Clientset kubeOption, err := util.GetKubeConfig(param.ClusterLevel) kubeClient = client.NewForConfigOrDie(&restclient.Config{ Host: *kubeOption.KubeProxyURL, }) if k8sCli.k8sParam.IsStateful { err = k8sCli.UpdateKubeStatefulSet(kubeClient) } else { err = k8sCli.UpdateKubeDeployment(kubeClient) } if err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } if err = k8sCli.UpdateKubeService(kubeClient); err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } var mappingPort int32 glog.Info("create_k8s_handler_put>>%s", param) if mappingPort, err = GenMappingPort(param); err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } k8sCli.k8sParam.ServicePort = mappingPort if err = k8sCli.UpdateKubeIngress(mappingPort, kubeClient); err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } apiK8sResponse := util.ApiK8sResponse(unversioned.RetCodeSuccess, unversioned.RetMsgSuccess, k8sCli.k8sParam.ServicePort, k8sCli.k8sParam.IngressClusterConfig) io.WriteString(response, apiK8sResponse) } func DeleteHandler(request *restful.Request, response *restful.Response) { param := new(unversioned.K8sParam) err := request.ReadEntity(¶m) if err != nil { response.WriteError(http.StatusInternalServerError, err) } k8sCli := K8sCli{ k8sParam: *param, k8sRunOption: util.RunOption, } var kubeClient *client.Clientset kubeOption, err := util.GetKubeConfig(param.ClusterLevel) kubeClient = client.NewForConfigOrDie(&restclient.Config{ Host: *kubeOption.KubeProxyURL, }) errs := make([]error, 0) if k8sCli.k8sParam.IsStateful { err = k8sCli.DeleteKubeStatefulSet(kubeClient) } else { err = k8sCli.DeleteKubeDeployment(kubeClient) } if err != nil { errs = append(errs, err) } err = k8sCli.DeleteKubeService(kubeClient) if err != nil { errs = append(errs, err) } err = k8sCli.DeleteKubeIngress(kubeClient) if err != nil { errs = append(errs, err) } err = utilerrors.NewAggregate(errs) if err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } apiK8sResponse := util.ApiResponse(unversioned.RetCodeSuccess, unversioned.RetMsgSuccess, []db.TaskEventLog{}) io.WriteString(response, apiK8sResponse) } func GetDeploymentHandler(request *restful.Request, response *restful.Response) { cluster_level := request.PathParameter("cluster_level") namespace := request.PathParameter("tenant") project_name := request.PathParameter("project_name") param := &unversioned.K8sParam{ NameSpace: namespace, ClusterLevel: cluster_level, ProjectName: project_name, } k8sCli := K8sCli{ k8sParam: *param, k8sRunOption: util.RunOption, } var kubeClient *client.Clientset kubeOption, err := util.GetKubeConfig(param.ClusterLevel) kubeClient = client.NewForConfigOrDie(&restclient.Config{ Host: *kubeOption.KubeProxyURL, }) deployment, err := k8sCli.GetKubeDeployment(kubeClient) if err != nil { apiResponse := util.ApiDeploymentResponse(unversioned.RetCodeFailed, err.Error(), deployment) io.WriteString(response, apiResponse) return } apiResponse := util.ApiDeploymentResponse(unversioned.RetCodeSuccess, unversioned.RetMsgSuccess, deployment) io.WriteString(response, apiResponse) return } func GetStatefulSetHandler(request *restful.Request, response *restful.Response) { cluster_level := request.PathParameter("cluster_level") namespace := request.PathParameter("tenant") project_name := request.PathParameter("project_name") param := &unversioned.K8sParam{ NameSpace: namespace, ClusterLevel: cluster_level, ProjectName: project_name, } k8sCli := K8sCli{ k8sParam: *param, k8sRunOption: util.RunOption, } var kubeClient *client.Clientset kubeOption, err := util.GetKubeConfig(param.ClusterLevel) kubeClient = client.NewForConfigOrDie(&restclient.Config{ Host: *kubeOption.KubeProxyURL, }) statefulset, err := k8sCli.GetKubeStatefulSet(kubeClient) if err != nil { apiResponse := util.ApiStatefulSetResponse(unversioned.RetCodeFailed, err.Error(), statefulset) io.WriteString(response, apiResponse) return } apiResponse := util.ApiStatefulSetResponse(unversioned.RetCodeSuccess, unversioned.RetMsgSuccess, statefulset) io.WriteString(response, apiResponse) return } func ScaleK8sHandler(request *restful.Request, response *restful.Response) { param := new(unversioned.K8sParam) err := request.ReadEntity(¶m) if err != nil { response.WriteError(http.StatusInternalServerError, err) } k8sCli := K8sCli{ k8sParam: *param, k8sRunOption: util.RunOption, } var kubeClient *client.Clientset kubeOption, err := util.GetKubeConfig(param.ClusterLevel) kubeClient = client.NewForConfigOrDie(&restclient.Config{ Host: *kubeOption.KubeProxyURL, }) kind := unversioned.APIResourceKindDeployments if k8sCli.k8sParam.IsStateful { kind = unversioned.APIResourceKindStatefulsets } err = k8sCli.ScaleApp(kind, kubeClient) if err != nil { apiResponse := util.ApiResponse(unversioned.RetCodeFailed, err.Error(), []db.TaskEventLog{}) io.WriteString(response, apiResponse) return } apiK8sResponse := util.ApiResponse(unversioned.RetCodeSuccess, unversioned.RetMsgSuccess, []db.TaskEventLog{}) io.WriteString(response, apiK8sResponse) }
package k8sengine import ( "encoding/json" "fmt" "strings" types "wdcp-build/pkg/api/unversioned" "wdcp-build/pkg/util" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/apimachinery/pkg/labels" ) const ( KubeIngBw string = "kubernetes.io/ingress-bandwidth" KubeEgBw string = "kubernetes.io/egress-bandwidth" ) func (k8sCli *K8sCli) genObjectMeta() metav1.ObjectMeta { objectMeta := metav1.ObjectMeta{ Name: k8sCli.k8sParam.ProjectName, Namespace: k8sCli.k8sParam.NameSpace, } return objectMeta } func (k8sCli *K8sCli) genTemplate() api.PodTemplateSpec { var previledged bool previledged = true flumeImg, _ := util.GetBasicImage("flume", k8sCli.k8sParam.ClusterLevel) envs := GenEvns(k8sCli.k8sParam.Envs, k8sCli.k8sParam.ImageType, k8sCli.k8sParam.ProjectName, k8sCli.k8sParam.ClusterLevel) volumesMounts := []api.VolumeMount{} envs, volumesMounts = LogVolumes(k8sCli.k8sParam.ImageType, envs) pvcName := fmt.Sprintf("%s-%s", k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.ProjectName) if k8sCli.k8sParam.IsStateful { sfsMount := api.VolumeMount{ Name: pvcName, MountPath: k8sCli.k8sParam.StatefulVolumes[0].VolumePath, } volumesMounts = append(volumesMounts, sfsMount) } volumes := MountVolumns(k8sCli.k8sParam.ImageType, k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.ProjectName, envs) ccname := GenContainerName(k8sCli.k8sParam.ProjectName) nodeSelector := GenNodeSelector(k8sCli.k8sParam.NodeLabels) podTmplSpec := api.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{"name": k8sCli.k8sParam.ProjectName}, }, Spec: api.PodSpec{ Containers: []api.Container{ { Name: ccname, Image: k8sCli.k8sParam.Image, ImagePullPolicy: api.PullAlways, SecurityContext: &api.SecurityContext{Privileged: &previledged}, Resources: api.ResourceRequirements{ Limits: api.ResourceList{ api.ResourceCPU: *resource.NewQuantity(int64(k8sCli.k8sParam.CpuCores), resource.DecimalSI), api.ResourceMemory: *resource.NewQuantity(k8sCli.k8sParam.Memory*(types.K8sResMemG), resource.DecimalSI), }, Requests: api.ResourceList{ api.ResourceCPU: *resource.NewQuantity(int64(k8sCli.k8sParam.CpuCores), resource.DecimalSI), api.ResourceMemory: *resource.NewQuantity(k8sCli.k8sParam.Memory*(types.K8sResMemG), resource.DecimalSI), }, }, Env: envs, Command: k8sCli.k8sParam.RunCmds, }, { Name: "flume", Image: flumeImg, ImagePullPolicy: api.PullAlways, Resources: api.ResourceRequirements{ Limits: api.ResourceList{ api.ResourceCPU: resource.MustParse(types.K8sResFlumeCpuLimit), api.ResourceMemory: *resource.NewQuantity(types.K8sResFlumeMemLimt*(types.K8sResMemM), resource.DecimalSI), }, Requests: api.ResourceList{ api.ResourceCPU: resource.MustParse(types.K8sResFlumeCpuRequest), api.ResourceMemory: *resource.NewQuantity(types.K8sResFlumeMemRequest*(types.K8sResMemM), resource.DecimalSI), }, }, VolumeMounts: volumesMounts, }, }, Volumes: volumes, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, NodeSelector: nodeSelector, }, } bwAn, ingBwFlag, egBwFlag := GenBandWidth(k8sCli.k8sParam.BandWidth) if ingBwFlag || egBwFlag { podTmplSpec.ObjectMeta.Annotations = bwAn } return podTmplSpec } func (k8sCli *K8sCli) genDeployment() *extensions.Deployment { objectMeta := k8sCli.genObjectMeta() var revisionHistoryLimit int32 revisionHistoryLimit = 1 templat f76c e := k8sCli.genTemplate() strategyType := extensions.RecreateDeploymentStrategyType if strings.ToLower(k8sCli.k8sParam.Strategy) != types.StrategyTypeRecreate { strategyType = extensions.RollingUpdateDeploymentStrategyType } spec := extensions.DeploymentSpec{ Replicas: k8sCli.k8sParam.Replicas, Strategy: extensions.DeploymentStrategy{ Type: strategyType, }, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "name": k8sCli.k8sParam.ProjectName, }, }, RevisionHistoryLimit: &revisionHistoryLimit, Template: template, } deployment := &extensions.Deployment{ ObjectMeta: objectMeta, Spec: spec, } b, _ := json.Marshal(deployment) glog.Info(string(b)) return deployment } func (k8sCli *K8sCli) genStatefulSet() *apps.StatefulSet { objectMeta := k8sCli.genObjectMeta() var revisionHistoryLimit int32 revisionHistoryLimit = 1 //k8sCli.k8sParam.StorageType ceph glusterfs storageClassName := fmt.Sprintf("%s-%s-sc", k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.StatefulVolumes[0].VolumeType) pvcName := fmt.Sprintf("%s-%s", k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.ProjectName) template := k8sCli.genTemplate() accessMode := api.ReadWriteOnce if k8sCli.k8sParam.StatefulVolumes[0].VolumeType == "glusterfs" { accessMode = api.ReadWriteMany } strategyType := apps.StatefulSetUpdateStrategyType(apps.OnDeleteStatefulSetStrategyType) if strings.ToLower(k8sCli.k8sParam.Strategy) != types.StrategyTypeRecreate { strategyType = apps.StatefulSetUpdateStrategyType(apps.RollingUpdateStatefulSetStrategyType) } spec := apps.StatefulSetSpec{ Replicas: k8sCli.k8sParam.Replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "name": k8sCli.k8sParam.ProjectName, }, }, Template: template, ServiceName: k8sCli.k8sParam.ProjectName, PodManagementPolicy: apps.ParallelPodManagement, RevisionHistoryLimit: &revisionHistoryLimit, UpdateStrategy: apps.StatefulSetUpdateStrategy{ Type: strategyType, }, VolumeClaimTemplates: []api.PersistentVolumeClaim{ api.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: pvcName, }, Spec: api.PersistentVolumeClaimSpec{ StorageClassName: &storageClassName, AccessModes: []api.PersistentVolumeAccessMode{accessMode}, Resources: api.ResourceRequirements{ Requests: api.ResourceList{ api.ResourceStorage: *resource.NewQuantity(types.K8sResStorageG*(k8sCli.k8sParam.StatefulVolumes[0].VolumeSize), resource.DecimalSI), }, }, }, }, }, } sts := &apps.StatefulSet{ ObjectMeta: objectMeta, Spec: spec, } b, _ := json.Marshal(sts) glog.Info(string(b)) return sts } func (k8sCli *K8sCli) CreateKubeDeployment(kubeClient *client.Clientset) error { deployment := k8sCli.genDeployment() dep, err := kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Create(deployment) if err == nil { b, _ := json.Marshal(dep) glog.Info("create deployment result>>>%s", string(b)) } else { glog.Info("create-deployment-err>>>%s", err) } return err } func (k8sCli *K8sCli) CreateKubeStatefulSet(kubeClient *client.Clientset) error { sts := k8sCli.genStatefulSet() st, err := kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Create(sts) if err == nil { b, _ := json.Marshal(st) glog.Info("create statefulset result>>>%s", string(b)) } else { glog.Info("create-statefulset-err>>>%s", err) } return err } func (k8sCli *K8sCli) DeleteKubeDeployment(kubeClient *client.Clientset) error { lableSelector := labels.Set(map[string]string{"name": k8sCli.k8sParam.ProjectName}).AsSelector().String() rsList, err := kubeClient.ReplicaSets(k8sCli.k8sParam.NameSpace).List(metav1.ListOptions{ LabelSelector: lableSelector, }) if err != nil { glog.Info("get rs of %s err:%s", k8sCli.k8sParam.ProjectName, err) return err } err = kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Delete(k8sCli.k8sParam.ProjectName, &metav1.DeleteOptions{}) glog.Info("delete deployment %s :%s", k8sCli.k8sParam.ProjectName, err) if err != nil { return err } for _, rs := range rsList.Items { kubeClient.ReplicaSets(k8sCli.k8sParam.NameSpace).Delete(rs.GetName(), &metav1.DeleteOptions{}) } podList, err := kubeClient.Pods(k8sCli.k8sParam.NameSpace).List(metav1.ListOptions{ LabelSelector: lableSelector, }) for _, pod := range podList.Items { kubeClient.Pods(k8sCli.k8sParam.NameSpace).Delete(pod.GetName(), &metav1.DeleteOptions{}) } return err } func (k8sCli *K8sCli) DeleteKubeStatefulSet(kubeClient *client.Clientset) error { lableSelector := labels.Set(map[string]string{"name": k8sCli.k8sParam.ProjectName}).AsSelector().String() err := kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Delete(k8sCli.k8sParam.ProjectName, &metav1.DeleteOptions{}) glog.Info("delete statefulset %s :%s", k8sCli.k8sParam.ProjectName, err) if err != nil { return err } podList, err := kubeClient.Pods(k8sCli.k8sParam.NameSpace).List(metav1.ListOptions{ LabelSelector: lableSelector, }) for _, pod := range podList.Items { kubeClient.Pods(k8sCli.k8sParam.NameSpace).Delete(pod.GetName(), &metav1.DeleteOptions{}) } return err } func (k8sCli *K8sCli) GetKubeDeployment(kubeClient *client.Clientset) (*extensions.Deployment, error) { return kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Get(k8sCli.k8sParam.ProjectName, metav1.GetOptions{}) } func (k8sCli *K8sCli) GetKubeStatefulSet(kubeClient *client.Clientset) (*apps.StatefulSet, error) { return kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Get(k8sCli.k8sParam.ProjectName, metav1.GetOptions{}) } func (k8sCli *K8sCli) UpdateKubeDeployment(kubeClient *client.Clientset) error { deployment := k8sCli.genDeployment() dep, err := kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Update(deployment) if err == nil { glog.Info("update deployment result>>>%s", dep) } else { glog.Info("update-deployment-err>>>%s", err) } return err } func (k8sCli *K8sCli) UpdateKubeStatefulSet(kubeClient *client.Clientset) error { statefulset := k8sCli.genStatefulSet() sts, err := kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Update(statefulset) if err == nil { glog.Info("update statefulset result>>>%s", sts) } else { glog.Info("update-statefulset-err>>>%s", err) } return err } func (k8sCli *K8sCli) ScaleApp(kind string, kubeClient *client.Clientset) error { scale := extensions.Scale{ ObjectMeta: metav1.ObjectMeta{ Name: k8sCli.k8sParam.ProjectName, Namespace: k8sCli.k8sParam.NameSpace, }, Spec: extensions.ScaleSpec{ Replicas: k8sCli.k8sParam.Replicas, }, } _, err := kubeClient.Scales(k8sCli.k8sParam.NameSpace).Update(kind, &scale) return err }
package k8sengine import ( "encoding/json" "fmt" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" ) func (k8sCli *K8sCli) genKubeIngress(servicePort int32) *extensions.Ingress { strSvcPort := fmt.Sprintf("%d", servicePort) objectMeta := metav1.ObjectMeta{ Name: k8sCli.k8sParam.ProjectName, Namespace: k8sCli.k8sParam.NameSpace, Annotations: map[string]string{ "serverPort": strSvcPort, "visualIP": k8sCli.k8sParam.IngressParam.VisualIP, "kubernetes.io/ingress.class": k8sCli.k8sParam.IngressParam.IngressClusterName, }, } spec := extensions.IngressSpec{ Rules: []extensions.IngressRule{ extensions.IngressRule{ Host: k8sCli.k8sParam.IngressParam.Host, IngressRuleValue: extensions.IngressRuleValue{ HTTP: &extensions.HTTPIngressRuleValue{ Paths: []extensions.HTTPIngressPath{ extensions.HTTPIngressPath{ Path: k8sCli.k8sParam.IngressParam.Location, Backend: extensions.IngressBackend{ ServiceName: k8sCli.k8sParam.ProjectName, ServicePort: intstr.IntOrString{ Type: intstr.Int, IntVal: k8sCli.k8sParam.TargetPort, }, }, }, }, }, }, }, }, } ing := &extensions.Ingress{ ObjectMeta: objectMeta, Spec: spec, } b, _ := json.Marshal(ing) glog.Info(string(b)) return ing } func (k8sCli *K8sCli) CreateKubeIngress(servicePort int32, kubeClient *client.Clientset) error { k8sCli.ChooseIngressCluster(kubeClient) ing := k8sCli.genKubeIngress(servicePort) ning, err := kubeClient.Extensions().Ingresses(k8sCli.k8sParam.NameSpace).Create(ing) if err != nil { glog.Error("ingress-create-err>>>%s", err) } else { b, _ := json.Marshal(ning) glog.Info("create ingress result >>>%s", string(b)) } return err } func (k8sCli *K8sCli) DeleteKubeIngress(kubeClient *client.Clientset) error { err := kubeClient.Extensions().Ingresses(k8sCli.k8sParam.NameSpace).Delete(k8sCli.k8sParam.ProjectName, &metav1.DeleteOptions{}) glog.Info("delete ingress %s:%s", k8sCli.k8sParam.ProjectName, err) return err } func (k8sCli *K8sCli) UpdateKubeIngress(servicePort int32, kubeClient *client.Clientset) error { k8sCli.ChooseIngressCluster(kubeClient) ing := k8sCli.genKubeIngress(servicePort) ning, err := kubeClient.Extensions().Ingresses(k8sCli.k8sParam.NameSpace).Update(ing) if err != nil { glog.Error("ingress-update-err>>>%s", err) } else { b, _ := json.Marshal(ning) glog.Info("update ingress result >>>%s", string(b)) } return err }
package k8sengine import ( "encoding/json" "fmt" "strings" types "wdcp-build/pkg/api/unversioned" "wdcp-build/pkg/util" "github.com/golang/glog" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/apis/apps" "k8s.io/kubernetes/pkg/apis/extensions" client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/apimachinery/pkg/labels" ) const ( KubeIngBw string = "kubernetes.io/ingress-bandwidth" KubeEgBw string = "kubernetes.io/egress-bandwidth" ) func (k8sCli *K8sCli) genObjectMeta() metav1.ObjectMeta { objectMeta := metav1.ObjectMeta{ Name: k8sCli.k8sParam.ProjectName, Namespace: k8sCli.k8sParam.NameSpace, } return objectMeta } func (k8sCli *K8sCli) genTemplate() api.PodTemplateSpec { var previledged bool previledged = true flumeImg, _ := util.GetBasicImage("flume", k8sCli.k8sParam.ClusterLevel) envs := GenEvns(k8sCli.k8sParam.Envs, k8sCli.k8sParam.ImageType, k8sCli.k8sParam.ProjectName, k8sCli.k8sParam.ClusterLevel) volumesMounts := []api.VolumeMount{} envs, volumesMounts = LogVolumes(k8sCli.k8sParam.ImageType, envs) pvcName := fmt.Sprintf("%s-%s", k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.ProjectName) if k8sCli.k8sParam.IsStateful { sfsMount := api.VolumeMount{ Name: pvcName, MountPath: k8sCli.k8sParam.StatefulVolumes[0].VolumePath, } volumesMounts = append(volumesMounts, sfsMount) } volumes := MountVolumns(k8sCli.k8sParam.ImageType, k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.ProjectName, envs) ccname := GenContainerName(k8sCli.k8sParam.ProjectName) nodeSelector := GenNodeSelector(k8sCli.k8sParam.NodeLabels) podTmplSpec := api.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{ Labels: map[string]string{"name": k8sCli.k8sParam.ProjectName}, }, Spec: api.PodSpec{ Containers: []api.Container{ { Name: ccname, Image: k8sCli.k8sParam.Image, ImagePullPolicy: api.PullAlways, SecurityContext: &api.SecurityContext{Privileged: &previledged}, Resources: api.ResourceRequirements{ Limits: api.ResourceList{ api.ResourceCPU: *resource.NewQuantity(int64(k8sCli.k8sParam.CpuCores), resource.DecimalSI), api.ResourceMemory: *resource.NewQuantity(k8sCli.k8sParam.Memory*(types.K8sResMemG), resource.DecimalSI), }, Requests: api.ResourceList{ api.ResourceCPU: *resource.NewQuantity(int64(k8sCli.k8sParam.CpuCores), resource.DecimalSI), api.ResourceMemory: *resource.NewQuantity(k8sCli.k8sParam.Memory*(types.K8sResMemG), resource.DecimalSI), }, }, Env: envs, Command: k8sCli.k8sParam.RunCmds, }, { Name: "flume", Image: flumeImg, ImagePullPolicy: api.PullAlways, Resources: api.ResourceRequirements{ Limits: api.ResourceList{ api.ResourceCPU: resource.MustParse(types.K8sResFlumeCpuLimit), api.ResourceMemory: *resource.NewQuantity(types.K8sResFlumeMemLimt*(types.K8sResMemM), resource.DecimalSI), }, Requests: api.ResourceList{ api.ResourceCPU: resource.MustParse(types.K8sResFlumeCpuRequest), api.ResourceMemory: *resource.NewQuantity(types.K8sResFlumeMemRequest*(types.K8sResMemM), resource.DecimalSI), }, }, VolumeMounts: volumesMounts, }, }, Volumes: volumes, RestartPolicy: api.RestartPolicyAlways, DNSPolicy: api.DNSClusterFirst, NodeSelector: nodeSelector, }, } bwAn, ingBwFlag, egBwFlag := GenBandWidth(k8sCli.k8sParam.BandWidth) if ingBwFlag || egBwFlag { podTmplSpec.ObjectMeta.Annotations = bwAn } return podTmplSpec } func (k8sCli *K8sCli) genDeployment() *extensions.Deployment { objectMeta := k8sCli.genObjectMeta() var revisionHistoryLimit int32 revisionHistoryLimit = 1 template := k8sCli.genTemplate() strategyType := extensions.RecreateDeploymentStrategyType if strings.ToLower(k8sCli.k8sParam.Strategy) != types.StrategyTypeRecreate { strategyType = extensions.RollingUpdateDeploymentStrategyType } spec := extensions.DeploymentSpec{ Replicas: k8sCli.k8sParam.Replicas, Strategy: extensions.DeploymentStrategy{ Type: strategyType, }, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "name": k8sCli.k8sParam.ProjectName, }, }, RevisionHistoryLimit: &revisionHistoryLimit, Template: template, } deployment := &extensions.Deployment{ ObjectMeta: objectMeta, Spec: spec, } b, _ := json.Marshal(deployment) glog.Info(string(b)) return deployment } func (k8sCli *K8sCli) genStatefulSet() *apps.StatefulSet { objectMeta := k8sCli.genObjectMeta() var revisionHistoryLimit int32 revisionHistoryLimit = 1 //k8sCli.k8sParam.StorageType ceph glusterfs storageClassName := fmt.Sprintf("%s-%s-sc", k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.StatefulVolumes[0].VolumeType) pvcName := fmt.Sprintf("%s-%s", k8sCli.k8sParam.ClusterLevel, k8sCli.k8sParam.ProjectName) template := k8sCli.genTemplate() accessMode := api.ReadWriteOnce if k8sCli.k8sParam.StatefulVolumes[0].VolumeType == "glusterfs" { accessMode = api.ReadWriteMany } strategyType := apps.StatefulSetUpdateStrategyType(apps.OnDeleteStatefulSetStrategyType) if strings.ToLower(k8sCli.k8sParam.Strategy) != types.StrategyTypeRecreate { strategyType = apps.StatefulSetUpdateStrategyType(apps.RollingUpdateStatefulSetStrategyType) } spec := apps.StatefulSetSpec{ Replicas: k8sCli.k8sParam.Replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "name": k8sCli.k8sParam.ProjectName, }, }, Template: template, ServiceName: k8sCli.k8sParam.ProjectName, PodManagementPolicy: apps.ParallelPodManagement, RevisionHistoryLimit: &revisionHistoryLimit, UpdateStrategy: apps.StatefulSetUpdateStrategy{ Type: strategyType, }, VolumeClaimTemplates: []api.PersistentVolumeClaim{ api.PersistentVolumeClaim{ ObjectMeta: metav1.ObjectMeta{ Name: pvcName, }, Spec: api.PersistentVolumeClaimSpec{ StorageClassName: &storageClassName, AccessModes: []api.PersistentVolumeAccessMode{accessMode}, Resources: api.ResourceRequirements{ Requests: api.ResourceList{ api.ResourceStorage: *resource.NewQuantity(types.K8sResStorageG*(k8sCli.k8sParam.StatefulVolumes[0].VolumeSize), resource.DecimalSI), }, }, }, }, }, } sts := &apps.StatefulSet{ ObjectMeta: objectMeta, Spec: spec, } b, _ := json.Marshal(sts) glog.Info(string(b)) return sts } func (k8sCli *K8sCli) CreateKubeDeployment(kubeClient *client.Clientset) error { deployment := k8sCli.genDeployment() dep, err := kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Create(deployment) if err == nil { b, _ := json.Marshal(dep) glog.Info("create deployment result>>>%s", string(b)) } else { glog.Info("create-deployment-err>>>%s", err) } return err } func (k8sCli *K8sCli) CreateKubeStatefulSet(kubeClient *client.Clientset) error { sts := k8sCli.genStatefulSet() st, err := kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Create(sts) if err == nil { b, _ := json.Marshal(st) glog.Info("create statefulset result>>>%s", string(b)) } else { glog.Info("create-statefulset-err>>>%s", err) } return err } func (k8sCli *K8sCli) DeleteKubeDeployment(kubeClient *client.Clientset) error { lableSelector := labels.Set(map[string]string{"name": k8sCli.k8sParam.ProjectName}).AsSelector().String() rsList, err := kubeClient.ReplicaSets(k8sCli.k8sParam.NameSpace).List(metav1.ListOptions{ LabelSelector: lableSelector, }) if err != nil { glog.Info("get rs of %s err:%s", k8sCli.k8sParam.ProjectName, err) return err } err = kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Delete(k8sCli.k8sParam.ProjectName, &metav1.DeleteOptions{}) glog.Info("delete deployment %s :%s", k8sCli.k8sParam.ProjectName, err) if err != nil { return err } for _, rs := range rsList.Items { kubeClient.ReplicaSets(k8sCli.k8sParam.NameSpace).Delete(rs.GetName(), &metav1.DeleteOptions{}) } podList, err := kubeClient.Pods(k8sCli.k8sParam.NameSpace).List(metav1.ListOptions{ LabelSelector: lableSelector, }) for _, pod := range podList.Items { kubeClient.Pods(k8sCli.k8sParam.NameSpace).Delete(pod.GetName(), &metav1.DeleteOptions{}) } return err } func (k8sCli *K8sCli) DeleteKubeStatefulSet(kubeClient *client.Clientset) error { lableSelector := labels.Set(map[string]string{"name": k8sCli.k8sParam.ProjectName}).AsSelector().String() err := kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Delete(k8sCli.k8sParam.ProjectName, &metav1.DeleteOptions{}) glog.Info("delete statefulset %s :%s", k8sCli.k8sParam.ProjectName, err) if err != nil { return err } podList, err := kubeClient.Pods(k8sCli.k8sParam.NameSpace).List(metav1.ListOptions{ LabelSelector: lableSelector, }) for _, pod := range podList.Items { kubeClient.Pods(k8sCli.k8sParam.NameSpace).Delete(pod.GetName(), &metav1.DeleteOptions{}) } return err } func (k8sCli *K8sCli) GetKubeDeployment(kubeClient *client.Clientset) (*extensions.Deployment, error) { return kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Get(k8sCli.k8sParam.ProjectName, metav1.GetOptions{}) } func (k8sCli *K8sCli) GetKubeStatefulSet(kubeClient *client.Clientset) (*apps.StatefulSet, error) { return kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Get(k8sCli.k8sParam.ProjectName, metav1.GetOptions{}) } func (k8sCli *K8sCli) UpdateKubeDeployment(kubeClient *client.Clientset) error { deployment := k8sCli.genDeployment() dep, err := kubeClient.Deployments(k8sCli.k8sParam.NameSpace).Update(deployment) if err == nil { glog.Info("update deployment result>>>%s", dep) } else { glog.Info("update-deployment-err>>>%s", err) } return err } func (k8sCli *K8sCli) UpdateKubeStatefulSet(kubeClient *client.Clientset) error { statefulset := k8sCli.genStatefulSet() sts, err := kubeClient.Apps().StatefulSets(k8sCli.k8sParam.NameSpace).Update(statefulset) if err == nil { glog.Info("update statefulset result>>>%s", sts) } else { glog.Info("update-statefulset-err>>>%s", err) } return err } func (k8sCli *K8sCli) ScaleApp(kind string, kubeClient *client.Clientset) error { scale := extensions.Scale{ ObjectMeta: metav1.ObjectMeta{ Name: k8sCli.k8sParam.ProjectName, Namespace: k8sCli.k8sParam.NameSpace, }, Spec: extensions.ScaleSpec{ Replicas: k8sCli.k8sParam.Replicas, }, } _, err := kubeClient.Scales(k8sCli.k8sParam.NameSpace).Update(kind, &scale) return err }
package k8sengine import ( "encoding/json" "strings" "github.com/golang/glog" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/kubernetes/pkg/api" client "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" ) func (k8sCli *K8sCli) genKubeService() *api.Service { objectMeta := metav1.ObjectMeta{ Name: k8sCli.k8sParam.ProjectName, Namespace: k8sCli.k8sParam.NameSpace, } protocal := api.ProtocolTCP if strings.ToUpper(k8sCli.k8sParam.Protocol) == "UDP" { protocal = api.ProtocolUDP } spec := api.ServiceSpec{ Type: api.ServiceTypeClusterIP, SessionAffinity: api.ServiceAffinityNone, Selector: map[string]string{"name": k8sCli.k8sParam.ProjectName}, ClusterIP: "None", Ports: []api.ServicePort{ api.ServicePort{ TargetPort: intstr.IntOrString{ Type: intstr.Int, IntVal: k8sCli.k8sParam.TargetPort, }, Port: k8sCli.k8sParam.TargetPort, // NodePort:nodeport, Protocol: protocal, }, }, } src := &api.Service{ ObjectMeta: objectMeta, Spec: spec, } b, _ := json.Marshal(src) glog.Info(string(b)) return src } func (k8sCli *K8sCli) CreateKubeService(kubeClient *client.Clientset) error { src := k8sCli.genKubeService() nsrc, err := kubeClient.Services(k8sCli.k8sParam.NameSpace).Create(src) glog.Info("create-service-err>>>>%s", err) if err == nil { b, _ := json.Marshal(nsrc) glog.Info("create service resutl>>>%s", string(b)) } else { glog.Error(err) } return err } func (k8sCli *K8sCli) DeleteKubeService(kubeClient *client.Clientset) error { err := kubeClient.Services(k8sCli.k8sParam.NameSpace).Delete(k8sCli.k8sParam.ProjectName, &metav1.DeleteOptions{}) glog.Info("delete service %s:%s", k8sCli.k8sParam.ProjectName, err) return err } func (k8sCli *K8sCli) UpdateKubeService(kubeClient *client.Clientset) error { osrc, err := kubeClient.Services(k8sCli.k8sParam.NameSpace).Get(k8sCli.k8sParam.ProjectName, metav1.GetOptions{}) if err != nil { glog.Error("get svc %s err:%s", k8sCli.k8sParam.ProjectName, err.Error()) } src := k8sCli.genKubeService() src.ObjectMeta.ResourceVersion = osrc.ObjectMeta.ResourceVersion nsrc, err := kubeClient.Services(k8sCli.k8sParam.NameSpace).Update(src) glog.Info("update-service-err>>>>%s", err) if err == nil { glog.Info("update service resutl>>>%s", nsrc) } else { glog.Error(err) } return err }
package k8sengine import ( "errors" "fmt" "strings" "wdcp-build/pkg/api/unversioned" "wdcp-build/pkg/db" "wdcp-build/pkg/util" "github.com/golang/glog" )
//生成集群唯一的服务端口 func GenMappingPort(param *unversioned.K8sParam) (int32, error) { glog.Info("genMappingPort1>>>%s", param) mysqlOptions := util.RunOption.MySQLOptions dbCli := db.MySQLCli{ Options: mysqlOptions, } clusterName := "cluster_" + param.ClusterLevel mp := db.MappingPort{ AppName: param.ProjectName, ClusterName: clusterName, Tenant: param.NameSpace, } glog.Info("query_mp :%s", mp) glog.Info("query_mp :%s,%s,%s", mp.ClusterName, mp.Tenant, mp.AppName) mpp, err := dbCli.GetMappingPort(mp) glog.Info("mpp :%s", mpp) // if err == nil && unsafe.Sizeof(mpp) != 0 { if err == nil && mpp.MappingPort != 0 { glog.Info("get mappingport :%d", mpp.MappingPort) return mpp.MappingPort, err } isVUrlReq := len(strings.TrimSpace(param.Protocol)) == 0 || param.TargetPort == 0 if err == nil && mpp.MappingPort == 0 && isVUrlReq { return mpp.MappingPort, errors.New("Not Found the service port") } var mappingPort int32 if mappingPorts, err := dbCli.MappingPorts(clusterName); err == nil { if len(mappingPorts) == 0 { // glog.Info("all mappingports size is 0") mappingPort = util.RunOption.PortRange.MinPort err = nil } else { for index, mp := range mappingPorts { // glog.Info("index:%s", index) // glog.Info("curr-mapping-port:%d", mp.MappingPort) j := index + 1 // glog.Info("j=index+1=%d", j) // glog.Info("len of mappingPorts:%d", len(mappingPorts)) if j < len(mappingPorts) { // glog.Info(" mappingPorts[j].MappingPort=%d", mappingPorts[j].MappingPort) // glog.Info("int32(mp.MappingPort+1)=%d", int32(mp.MappingPort+1)) // glog.Info(mappingPorts[j].MappingPort != int32(mp.MappingPort+1)) if mappingPorts[j].MappingPort != int32(mp.MappingPort+1) { mappingPort = int32(mp.MappingPort + 1) err = nil } else { continue } } else { if int32(mp.MappingPort+1) > util.RunOption.PortRange.MaxPort { mappingPort = 0 err = errors.New(fmt.Sprintf("Out of port range %d - %d", util.RunOption.PortRange.MinPort, util.RunOption.PortRange.MaxPort)) } else { mappingPort = int32(mappingPorts[len(mappingPorts)-1].MappingPort + 1) err = nil } } } } } else { mappingPort = 0 glog.Error("list existed mappingports err:%s", err) err = err } if mappingPort != 0 && err == nil && !isVUrlReq { glog.Info("genMappingPort2>>>%s", param) SaveMappingPort(dbCli, param, mappingPort, clusterName) } glog.Info("final genmapping port is %d", mappingPort) return mappingPort, err } func SaveMappingPort(dbCli db.MySQLCli, param *unversioned.K8sParam, mappingPort int32, clusterName string) { mp := db.MappingPort{ AppName: param.ProjectName, Protocal: param.Protocol, ContainerPort: param.TargetPort, MappingPort: mappingPort, ClusterName: clusterName, Tenant: param.NameSpace, } _, err := dbCli.CreateMappingPorts(mp) if err != nil { glog.Error("save mappingport %d:%s", mappingPort, err) } }
相关文章推荐
- 4000 docker k8s二次开发(一) 注册自定义api
- 使用python的docker-py实现docker的api操作 推荐
- FLASK (CURRENCY)汇率换算api JSON数据读取显示实现
- Docker 实现浏览器里开发Android应用的功能
- netty实现http api功能
- Linux网络编程基础API(多线程实现)
- FansMail:邮件发送标准API与技术实现(Java)
- Jenkins+Svn+Docker+SpringCloud 实现可持续自动化微服务
- 如何实现RESTful Web API的身份验证
- api导出excel的四种实现方法
- 通过Docker、Alpine Linux和Unbound实现DNS服务器托管
- docker registry接入ceph Swift API
- ArcGIS API For IOS 实现图层显示控制
- jQuery.API源码深入剖析以及应用实现(4) - 选择器篇(下)
- PHP中Restful api 错误提示返回值实现思路
- 基于GoogleMap,Mapabc,51ditu,VirtualEarth,YahooMap Api接口的Jquery插件的通用实现(含源代码下载)
- Spring Cloud与微服务学习总结(3)——认证鉴权与API权限控制在微服务架构中的设计与实现(一)
- python调用zabbix api 实现主机添加等功能
- Tcl脚本调用高层API实现仪表使用和主机创建配置的自动化测试用例
- 关于微博服务端API的OAuth认证实现