spring cloud data flow demo
2017-04-05 11:27
465 查看
一.环境准备
a)Java1.8及以上 b)关系型数据库用来存储stream task和程序的状态(默认使用内嵌的H2) c)Redis d)Message broker (rabbitMq,kafka) e)Maven f)为了方便spring组件的使用建议使用(sts),以下操作基于sts
二.Data Flow Server
Spring Cloud Data Flow 支持多种运行环境(Cloud Foundry 、Apache YARN 、Kubernetes 、Apache Mesos、Local Server for development )本例中使用spring 的 LocalServer. 1.新建一个spring start project 2.输入项目信息 3.选择spring版本及项目依赖 Boot version : 1.4.4 项目依赖选择 Local Data Flow server 选下一步等待maven包及项目构建完成 4.在 spring boot main class上添加@EnableDataFlowServer 注解
@EnableDataFlowServer @SpringBootApplication public class DfServerApplication { public static void main(String[] args) { SpringApplication.run(DfServerApplication.class, args); } }
5.现在Data Flow服务器就搭建好了 (需要redis的支持) 6.然后到项目路径下边 执行 mvn spring-boot:run 服务器以端口9393启动
三.Data Flow shell
a)项目构建与Data Flow Server一致 b)第三步选择依赖的时候选择 data flow shell c) 修改代码添加@EnableDataFlowShell注解
@EnableDataFlowShell @SpringBootApplication public class DfShellApplication { public static void main(String[] args) { SpringApplication.run(DfShellApplication.class, args); } }
d) 项目路径下执行 mvn spring-boot:run dataflow config server http://localhost:9393 若server 在本地 且 已经启动则会直接连接。
四.创建streams
a)创建sourcei.项目创建与server一致 ii.第三步选择依赖的时候选择 stream rabbit(若你使用的是kafka 择选择kafka) iii.修改代码
@EnableBinding(Source.class) @SpringBootApplication public class LoggingSourceApplication { @Bean @InboundChannelAdapter( value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1") ) public MessageSource<Long> timeMessageSource() { System.out.println(new Date() +"======================logging-source========================== execued"); return () -> MessageBuilder.withPayload(new Date().getTime()).build(); } public static void main(String[] args) { SpringApplication.run(LoggingSourceApplication.class, args); } }
iv.到项目路径 执行mvn clean install
b). 创建processor
i.所有步骤跟source一样 ii.代码修改
@EnableBinding(Processor.class) @SpringBootApplication public class LoggingProcessorApplication { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(Long timestamp) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd hh:mm:yy"); String date = dateFormat.format(timestamp); System.out.println(date + "------------------------------logging-proccessor------------------------------- executed"); return date; } public static void main(String[] args) { SpringApplication.run(LoggingProcessorApplication.class, args); } }
c). 创建sink
i.所有步骤跟source创建一样 ii.代码修改
@EnableBinding(Sink.class) @SpringBootApplication public class LoggingSinkApplication { @MessageEndpoint public static class LoggingMessageEndpoint { @ServiceActivator(inputChannel = Sink.INPUT) public void logIncomingMessages(@Payload String msg, @Headers Map<String, Object> headers) { System.out.println("logging-sink**************"+ msg); headers.entrySet().forEach(e -> System.out.println(e.getKey() + '=' + e.getValue())); } } @StreamListener(Sink.INPUT) public void loggerSink(String date) { System.out.println("logging-sink Received: " + date); } public static void main(String[] args) { SpringApplication.run(LoggingSinkApplication.class, args); } }
五.注册Stream app
执行 app register --name “demo” --type type --uri maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version> 注册完毕后 可执行 app list 查看注册列表
六.创建stream并部署
1.创建stream执行stream create --name “name” --definiation ‘a | b | c’ 然后执行 stream list 就可以看到刚才定义的stream
2.部署stream
执行命令 stream deploy --name ‘name’
七.创建Task
a)项目创建步骤同source b)第三步选择依赖时选择 Cloud Task c).修改相关代码
@EnableTask @EnableBatchProcessing @SpringBootApplication public class MyTaskApplication { public static void main(String[] args) { SpringApplication.run(MyTaskApplication.class, args); } } 创建jobConfiguration @Configuration public class JobConfiguration { private static Log logger = LogFactory.getLog(JobConfiguration.class); @Autowired public JobBuilderFactory jobBuilderFactory; @Autowired public StepBuilderFactory stepBuilderFactory; @Bean public Job job() { return jobBuilderFactory.get("job") .start(stepBuilderFactory.get("jobStep1") .tasklet(new Tasklet() { @Override public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception { logger.info("my Job was run"); return RepeatStatus.FINISHED; } }).build()).build(); } }
d).到项目路径下执行 mvn spring-boot:run
八.部署task
a)注册appapp register --name my-job --type task --uri maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version> 执行app list可以看到注册的task程序
b).创建task
task create myjob --difination ‘appname’ task list 可以看到刚刚创建的 task
c).运行task
task launch ‘taskname’ 然后执行 task execution list 就可以看到执行过的task
具体文档加图示 可下载文档查看
spring cloud data flow demo相关文章推荐
- Spring Cloud Data Flow 简介
- Spring5学习(二)-spring projects之Spring Cloud Data Flow
- 【SFA官方译文】:Spring Cloud Data Flow中的ETL
- Spring Cloud Data Flow Server for Apache Mesos 适用于mesos平台的springcloud数据流服务器(DCOS构建)
- Spring Cloud Data flow
- Spring Web Flow 入门demo(二)与业务结合 附源码
- SpringCloud(十四):Feign的demo之解决超时问题
- Spring Web Flow 入门demo(一)简单页面跳转 附源码
- Spring Data Redis简介以及项目Demo,RedisTemplate和 Serializer详解
- spring-boot-starter-data-redis(spring cloud 操作redis) RedisTemplate
- SpringCloud SpringBoot mybatis 分布式微服务(九)Spring Boot中使用Spring-data-jpa让数据访问更简单
- Spring Web Flow 入门demo(三)嵌套流程与业务结合 附源码
- SpringData JPA查询分页demo
- Spring Web Flow 入门demo(二)与业务结合 附源码
- Spring Data Redis简介以及项目Demo,RedisTemplate和 Serializer详解
- Spring Data Redis简介以及项目Demo,RedisTemplate和 Serializer详解
- SpringCloud(十一):Feign的demo之添加Feign
- Spring Web Flow 入门demo(一)简单页面跳转 附源码
- 【微框架】之一:从零开始,轻松搞定SpringCloud微服务系列--开山篇(spring boot 小demo)
- Spring Data Redis简介以及项目Demo,RedisTemplate和 Serializer详解