Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)
2017-07-18 00:00
621 查看
Spring Cloud Data Flow--大数据操作工具,作为Spring XD的替代产品,它是一个混合计算模型,结合了流数据与批量数据的处理方式。为数据微服务提供了业务流程,包括长期的 Spring Cloud Stream 应用程序和短期的 Spring Cloud Task 应用程序。
之前很少写博客,主要是国内相关资料少之又少,踩了很多坑,谷歌也没多大帮助,深知那种无助的感觉,所以记录一下,与喜欢研究技术的朋友分享一下,希望此文会有小小的帮助,微服务的明天会更好
1.通过vagrant安装DCOS,我们会得到一个Marathon端点url:http://m1.dcos/service/marathon,留作配置用,如果是其它比如CLI、GUI的安装方式,也可配置其服务的ip地址。service安装这里不做过多介绍,可以通过dcos的universe安装,也可以通过json文件安装
2.安装mysql可以通过DCOS进行安装,如果你有一个mysql数据库亦可不必安装
3.安装rabbitmq,如果你有一个rabbitmq服务器亦可不必安装,此处注意,对于springboot应用,应当在应用配置文件中配置mq的用户名密码,如:
4.安装redis,如果你有一个redis服务器亦可不必安装
5.安装chronos,如果你有一个chronos服务器亦可不必安装,速度慢说明拉取镜像慢,此处共享下百度云链接:http://pan.baidu.com/s/1dFaQvSX 密码:3j9c,下载到agent节点直接docker load < chronos.tar,docker images查看即可,重新执行,速度杠杠滴。
6.获取springclouddataflow的json配置文件
之前很少写博客,主要是国内相关资料少之又少,踩了很多坑,谷歌也没多大帮助,深知那种无助的感觉,所以记录一下,与喜欢研究技术的朋友分享一下,希望此文会有小小的帮助,微服务的明天会更好
1.通过vagrant安装DCOS,我们会得到一个Marathon端点url:http://m1.dcos/service/marathon,留作配置用,如果是其它比如CLI、GUI的安装方式,也可配置其服务的ip地址。service安装这里不做过多介绍,可以通过dcos的universe安装,也可以通过json文件安装
2.安装mysql可以通过DCOS进行安装,如果你有一个mysql数据库亦可不必安装
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @mysql.json -H "Content-type: application/json"
3.安装rabbitmq,如果你有一个rabbitmq服务器亦可不必安装,此处注意,对于springboot应用,应当在应用配置文件中配置mq的用户名密码,如:
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @rabbitmq.json -H "Content-type: application/json"
4.安装redis,如果你有一个redis服务器亦可不必安装
curl -X POST http://m1.dcos/service/marathon/v2/apps -d @redis.json -H "Content-type: application/json"
5.安装chronos,如果你有一个chronos服务器亦可不必安装,速度慢说明拉取镜像慢,此处共享下百度云链接:http://pan.baidu.com/s/1dFaQvSX 密码:3j9c,下载到agent节点直接docker load < chronos.tar,docker images查看即可,重新执行,速度杠杠滴。
dcos package install chronos
6.获取springclouddataflow的json配置文件
$ wget https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow-server-mesos/v1.0.0.RELEASE/src/etc/marathon/scdf-server.json[/code] 7.直接贴上我的配置文件
{
"id": "/spring-cloud-data-flow",
"instances": 1,
"cpus": 2,
"mem": 4000,
"disk": 3000,
"gpus": 0,
"backoffSeconds": 1,
"backoffFactor": 1.15,
"maxLaunchDelaySeconds": 3600,
"container": {
"type": "DOCKER",
"docker": {
"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",
"network": "BRIDGE",
"portMappings": [
{
"containerPort": 9393,
"hostPort": 0,
"servicePort": 10000,
"protocol": "tcp",
"name": "default"
}
],
"privileged": false,
"forcePullImage": false
}
},
"healthChecks": [
{
"gracePeriodSeconds": 120,
"intervalSeconds": 60,
"timeoutSeconds": 20,
"maxConsecutiveFailures": 0,
"portIndex": 0,
"path": "/management/health",
"protocol": "HTTP",
"ignoreHttp1xx": false
}
],
"upgradeStrategy": {
"minimumHealthCapacity": 1,
"maximumOverCapacity": 1
},
"unreachableStrategy": {
"inactiveAfterSeconds": 300,
"expungeAfterSeconds": 600
},
"killSelection": "YOUNGEST_FIRST",
"requirePorts": true,
"env": {
"JDBC_DRIVER": "org.mariadb.jdbc.Driver",
"MESOS_CHRONOS_URI": "http://172.16.1.77:10105",
"REDIS_HOST": "172.16.1.61",
"RABBITMQ_PORT": "6392",
"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon",
"REDIS_PORT": "6379",
"JDBC_PASSWORD": "1234321",
"JDBC_URL": "jdbc:mysql://172.16.1.145:3306/test",
"SPRING_APPLICATION_JSON": "{\"spring.cloud.deployer.mesos.marathon.apiEndpoint\":\"${MESOS_MARATHON_URI}\",\"spring.cloud.deployer.mesos.chronos.apiEndpoint\":\"${MESOS_CHRONOS_URI}\",\"spring.datasource.url\":\"${JDBC_URL}\",\"spring.datasource.driverClassName\":\"${JDBC_DRIVER}\",\"spring.datasource.username\":\"${JDBC_USERNAME}\",\"spring.datasource.password\":\"${JDBC_PASSWORD}\",\"spring.datasource.testOnBorrow\":true,\"spring.datasource.validationQuery\":\"SELECT 1\",\"spring.redis.host\":\"${REDIS_HOST}\",\"spring.redis.port\":\"${REDIS_PORT}\",\"spring.cloud.deployer.mesos.marathon.environmentVariables\":\"SPRING_RABBITMQ_HOST=${RABBITMQ_HOST},SPRING_RABBITMQ_PORT=${RABBITMQ_PORT}\",\"spring.cloud.deployer.mesos.dcos.authorizationToken\":\"${DCOS_TOKEN}\",\"spring.cloud.config.enabled\":false,\"spring.freemarker.checkTemplateLocation\":false,\"spring.cloud.deployer.mesos.marathon.memory\":\"3000\",\"spring.dataflow.embedded.database.enabled\":false}",
"RABBITMQ_HOST": "172.16.1.77",
"JDBC_USERNAME": "root"
}
}
这里对几点做说明:(1)"image": "springcloud/spring-cloud-dataflow-server-mesos:latest",对应的agent节点docker中的镜像,直接运行的话,首先它会下载镜像,由于你懂得,速度慢成翔,你可
以docker去pull喝着大茶慢慢等,如果你别处有此镜像也可导入到你的agent节点的docker中。此处共享下导出的镜像百度云地址链接:http://pan.baidu.com/s/1hstbj5E
密码:56nk,下载后直接docker load < springdataflow.tar,如果你有搭建docker的本地远程仓库,也可将镜像打个tag推送到仓库,agent的docker直接从仓库中pull。
(2)配置marathon还有chronos地址,也可以配置ip地址,浏览器能正常访问即可"MESOS_MARATHON_URI": "http://m1.dcos/service/marathon", "MESOS_CHRONOS_URI": "http://m1.dcos/service/chronos",
ip可以通过service的detail中查看,logs可以看到其运行日志
(3)配置mysql"JDBC_URL": "jdbc:mysql://", "JDBC_DRIVER": "org.mariadb.jdbc.Driver", "JDBC_USERNAME": "", "JDBC_PASSWORD": "",
(4)配置mq"RABBITMQ_HOST": "", "RABBITMQ_PORT": "",
(5)配置redis"REDIS_HOST": "", "REDIS_PORT": "",
(6)如果DCOS是开启权限认证,则需要配置DCOS_TOKEN
获取token方法如下:curl https://downloads.dcos.io/binaries/cli/linux/x86-64/dcos-1.9/dcos -o dcos && sudo mv dcos /usr/local/bin && sudo chmod +x /usr/local/bin/dcos && dcos config set core.dcos_url http://XXXXXXXXX && dcos
安装dcos的cli,配置你的master地址,然后dcos auth login 出现一个地址,http://XXXXXX/login?redirect_uri=urn:ietf:wg:oauth:2.0:oob贴到浏览器获取一个token
然后复制到控制台,登录成功。然后 dcos config show core.dcos_acs_token 出现的token可以copy到DCOS_TOKEN
还可以配置maven地址,可参阅文档:http://docs.spring.io/spring-cloud-dataflow/docs/1.2.2.BUILD-SNAPSHOT/reference/htmlsingle/#arch-data-flow-server,但是注册app的时候,直接配置jar的地址或者maven地址,都会出现异常,看了下源码,在deploy的时候jar会生成一个临时镜像,然后通过fegin去请求marathon的api,但是请求一直会报异常null或者:reason不啦不啦的,所以我是把每一个流的jar都通过docker打包镜像push到远程仓库,配置本地docker仓库地址去拉取镜像,还有需要注意的地方就是注册app的名称还有stream的名称的时候全部用小写,不然fegin请求的时候也会报异常。若有大神知道jar或者maven配置的正确方法,请分享与我,谢谢。
8.然后就可以通过json文件运行了,页面也是可以操作的
查看日志启动成功
9.访问springdataflow的web端http://XXXX:AAA/dashboard,亦可通过spring-cloud-dataflow-shell
来注册app或者stream,此处不再赘述。
10.注册app
11.创建数据流
注意:至少得有一个source一个sink,由于资源有限,processor暂时不加入(ps:跑应用真的很消耗资源)
12.应用数据流
然后就可以取services去查看,一个jar应该会启动一个实例
大功告成哈哈哈哈哈哈哈哈哈
下面附上部分代码
source:
applicationimport java.util.Date; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.context.annotation.Bean; import org.springframework.integration.annotation.InboundChannelAdapter; import org.springframework.integration.annotation.Poller; import org.springframework.integration.core.MessageSource; import org.springframework.messaging.support.MessageBuilder; @EnableBinding(Source.class) @SpringBootApplication public class LoggingSourceApplication { @Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource<Long> timeMessageSource() { System.out.println(new Date() +"======================logging-source========================== execued"); return () -> { System.out.println(new Date() + "*****logging-source****** send"); return MessageBuilder.withPayload(new Date().getTime()).build(); }; } public static void main(String[] args) { SpringApplication.run(LoggingSourceApplication.class, args); } }
propertiesspring.rabbitmq.host=172.16.3.183 spring.rabbitmq.username=admin spring.rabbitmq.password=admin # 本机启动测试需要以下配置 spring.cloud.stream.default.contentType=application/json spring.cloud.stream.bindings.output.destination=source-log # 默认情况下,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭, # 对应的连接关闭的时候,该队列也会消失。为此,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。 # 只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = logistic # 对应的队列就是持久化 spring.cloud.stream.bindings.output.group=logTest spring.cloud.stream.bindings.output.binder=rabbitMq1 spring.cloud.stream.binders.rabbitMq1.type=rabbit spring.cloud.stream.default-binder=rabbitMq1 # rabbitMQ服务器地址 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183 # rabbitMQ服务器端口 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/
pom<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.4.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
sink:
applicationimport java.util.Date; import java.util.Map; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.integration.annotation.MessageEndpoint; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.handler.annotation.Headers; import org.springframework.messaging.handler.annotation.Payload; @EnableBinding(Sink.class) @SpringBootApplication public class LoggingSinkApplication { @MessageEndpoint public static class LoggingMessageEndpoint { @ServiceActivator(inputChannel = Sink.INPUT) public void logIncomingMessages(@Payload String msg, @Headers Map<String, Object> headers) { System.out.println(new Date() + "***********logging-sink**************"+ msg); headers.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue())); } } @StreamListener(Sink.INPUT) public void loggerSink(String date) { System.out.println("logging-sink Received: " + date); } @Payload public static void main(String[] args) { SpringApplication.run(LoggingSinkApplication.class, args); } }
propertiesspring.rabbitmq.host=172.16.3.183 spring.rabbitmq.username=admin spring.rabbitmq.password=admin #本地测试需要配置 server.port=8090 spring.cloud.stream.default.contentType=application/json spring.cloud.stream.bindings.input.destination=source-log # 默认情况下,Spring Cloud Stream 会在 RabbitMQ 中创建一个临时的队列,程序关闭, # 对应的连接关闭的时候,该队列也会消失。为此,我们需要一个持久化的队列,并且指定一个分组,用于保证应用服务的缩放。 # 只需要在消费者端的 binding 添加配置项 spring.cloud.stream.bindings.[channelName].group = logistic # 对应的队列就是持久化 spring.cloud.stream.bindings.input.group=logTest spring.cloud.stream.bindings.input.binder=rabbitMq1 spring.cloud.stream.binders.rabbitMq1.type=rabbit spring.cloud.stream.default-binder=rabbitMq1 # rabbitMQ服务器地址 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.host=172.16.3.183 # rabbitMQ服务器端口 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.port=5672 spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.username=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.password=admin spring.cloud.stream.binders.rabbitMq1.environment.spring.rabbitmq.virtual-host=/
pom<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.4.4.RELEASE</version> <relativePath /> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Camden.SR5</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
相关文章推荐
- [置顶] Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)
- bug宝典hadoop篇 org.apache.hadoop.hdfs.server.datanode.DataNode: Initialization failed for Block pool <
- Spring Cloud Data Flow 简介
- 【SFA官方译文】:Spring Cloud Data Flow中的ETL
- DataStage On Cloud,构建云端的企业数据集成平台
- org.apache.hadoop.hdfs.server.datanode.DataNode: Exception in receiveBlock for block
- spring cloud data flow demo
- Spring5学习(二)-spring projects之Spring Cloud Data Flow
- 如何在Ubuntu Server 16.04上构建Apache Mesos
- FATAL org.apache.hadoop.hdfs.server.datanode.DataNode: Initialization failed for Block pool <registe
- maven构建spring mvc + spring data jpa+ sql server 配置
- org.apache.hadoop.hdfs.server.datanode.DataNode: Exception in BPOfferService for Block pool
- ArcGIS 10.1 for Server 新特性简要介绍(构建云GIS平台的基石)
- Spring Cloud Data flow
- Spring for Apache Hadoop 1.0
- Spring Cloud构建微服务架构(六)高可用服务注册中心
- SpringCloud--构建高可用Eureka注册中心
- 福利!送书《深入理解Spring Cloud与微服务构建》5本
- Scale-Out File Server for Application Data Overview
- Oracle Database 11.2.0.1 for RHEL5 Server(包括x86和x86_64平台)的静默安装指南