您的位置:首页 > 编程语言 > Java开发

spring cloud data flow demo

2017-04-05 11:27 465 查看

一.环境准备

a)Java1.8及以上
b)关系型数据库用来存储stream task和程序的状态(默认使用内嵌的H2)
c)Redis
d)Message broker (rabbitMq,kafka)
e)Maven
f)为了方便spring组件的使用建议使用(sts),以下操作基于sts


二.Data Flow Server

Spring Cloud Data Flow 支持多种运行环境(Cloud Foundry 、Apache YARN 、Kubernetes 、Apache Mesos、Local Server for development )本例中使用spring 的 LocalServer.
1.新建一个spring start project
2.输入项目信息
3.选择spring版本及项目依赖
Boot version : 1.4.4
项目依赖选择 Local Data Flow server
选下一步等待maven包及项目构建完成
4.在 spring boot main class上添加@EnableDataFlowServer 注解


@EnableDataFlowServer
@SpringBootApplication
public class DfServerApplication {

public static void main(String[] args) {
SpringApplication.run(DfServerApplication.class, args);
}
}


5.现在Data Flow服务器就搭建好了 (需要redis的支持)
6.然后到项目路径下边 执行 mvn spring-boot:run 服务器以端口9393启动


三.Data Flow shell

a)项目构建与Data Flow Server一致
b)第三步选择依赖的时候选择 data flow shell
c) 修改代码添加@EnableDataFlowShell注解


@EnableDataFlowShell
@SpringBootApplication
public class DfShellApplication {

public static void main(String[] args) {
SpringApplication.run(DfShellApplication.class, args);
}
}


d) 项目路径下执行 mvn spring-boot:run
dataflow config server http://localhost:9393 若server 在本地 且 已经启动则会直接连接。


四.创建streams

a)创建source

i.项目创建与server一致
ii.第三步选择依赖的时候选择 stream rabbit(若你使用的是kafka 择选择kafka)
iii.修改代码


@EnableBinding(Source.class)
@SpringBootApplication
public class LoggingSourceApplication {
@Bean
@InboundChannelAdapter(
value = Source.OUTPUT,
poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll =         "1")
)
public MessageSource<Long> timeMessageSource() {
System.out.println(new Date() +"======================logging-source========================== execued");
return () -> MessageBuilder.withPayload(new Date().getTime()).build();
}

public static void main(String[] args) {
SpringApplication.run(LoggingSourceApplication.class, args);
}
}


iv.到项目路径 执行mvn clean install


b). 创建processor

i.所有步骤跟source一样
ii.代码修改


@EnableBinding(Processor.class)
@SpringBootApplication
public class LoggingProcessorApplication {

@Transformer(inputChannel = Processor.INPUT,
outputChannel = Processor.OUTPUT)
public Object transform(Long timestamp) {

DateFormat dateFormat = new                                             SimpleDateFormat("yyyy/MM/dd hh:mm:yy");
String date = dateFormat.format(timestamp);
System.out.println(date +                           "------------------------------logging-proccessor-------------------------------  executed");
return date;
}
public static void main(String[] args) {
SpringApplication.run(LoggingProcessorApplication.class, args);
}
}


c). 创建sink

i.所有步骤跟source创建一样
ii.代码修改


@EnableBinding(Sink.class)
@SpringBootApplication
public class LoggingSinkApplication {

@MessageEndpoint
public static class LoggingMessageEndpoint {
@ServiceActivator(inputChannel = Sink.INPUT)
public void logIncomingMessages(@Payload String msg,                    @Headers Map<String, Object> headers) {
System.out.println("logging-sink**************"+                        msg);
headers.entrySet().forEach(e ->                                 System.out.println(e.getKey() + '=' + e.getValue()));
}
}
@StreamListener(Sink.INPUT)
public void loggerSink(String date) {
System.out.println("logging-sink Received: " + date);
}
public static void main(String[] args) {
SpringApplication.run(LoggingSinkApplication.class, args);
}
}


五.注册Stream app

执行 app register --name “demo” --type type --uri maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
注册完毕后 可执行 app list 查看注册列表


六.创建stream并部署

1.创建stream

执行stream create --name “name”  --definiation ‘a | b | c’
然后执行 stream list 就可以看到刚才定义的stream


2.部署stream

执行命令 stream deploy --name ‘name’


七.创建Task

a)项目创建步骤同source
b)第三步选择依赖时选择 Cloud Task
c).修改相关代码


@EnableTask
@EnableBatchProcessing
@SpringBootApplication
public class MyTaskApplication {

public static void main(String[] args) {
SpringApplication.run(MyTaskApplication.class, args);
}
}
创建jobConfiguration
@Configuration
public class JobConfiguration {
private static Log logger
= LogFactory.getLog(JobConfiguration.class);

@Autowired
public JobBuilderFactory jobBuilderFactory;

@Autowired
public StepBuilderFactory stepBuilderFactory;

@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(stepBuilderFactory.get("jobStep1")
.tasklet(new Tasklet() {

@Override
public RepeatStatus execute(StepContribution contribution,
ChunkContext chunkContext) throws Exception {

logger.info("my Job was run");
return RepeatStatus.FINISHED;
}
}).build()).build();
}
}


d).到项目路径下执行 mvn spring-boot:run


八.部署task

a)注册app

app register --name my-job --type task --uri maven://<groupId>:<artifactId>[:<extension>[:<classifier>]]:<version>
执行app list可以看到注册的task程序


b).创建task

task create myjob --difination ‘appname’
task list 可以看到刚刚创建的 task


c).运行task

task launch ‘taskname’
然后执行 task execution list 就可以看到执行过的task


具体文档加图示 可下载文档查看

spring cloud data flow demo
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: