您的位置:首页 > 大数据

大数据正式京淘13

2018-01-29 23:30 246 查看

大数据正式13

定时任务

防止恶意订单

在订单提交之后,没有支付,但是订单没有生成效益,却减少了库存,如果大量生成这种订单,库存到0,无法继续购买

解决方案

虚拟商品数量:这个一直减,不是太好--适合紧急解决

引入定时任务,超时未支付订单自动回库,库存自动回退

电商:一天

解决技术

Timer的API

插件:石英钟

图解关系



使用

本身就是一个jar包

核心组件【Job,JobDetail具体处理,Scheduler调度器,Trigger触发器】

JobDetail+Job

继承自石英钟的父类,启动容器后,一旦加载JobDetail的实例,其中JobDetail下的多个job逻辑需要编写代码

调度器:负责调用一个JobDetail的的时间触发器

触发器:管理触发当前一个石英钟逻辑的JobDetail的组件,时间计算表达式,任何触发任务执行是由触发器计算管理的

简单解释

Job

表示一个工作,要执行的具体内容。

此接口中只有一个方法void execute(JobExecutionContext context)

JobDetail

JobDetail表示一个具体的可执行的调度程序,Job是这个可执行程调度程序所要执行的内容,另外JobDetail还包含了这个任务调度的方案和策略。

Trigger

代表一个调度参数的配置,什么时候去调。

Scheduler

代表一个调度容器,一个调度容器中可以注册多个JobDetail和Trigger。当Trigger与JobDetail组合,就可以被Scheduler容器调度了。

触发器的分类

简单触发器【Simple】

只能完成一些简单的circle时间逻辑,每隔一段时间,进行任务触发

复杂计算器【cron】

可以定在任意时间点进行事件的触发

Second:秒

Minute:分

Hour:时

Day-of-month:月中的天

Month:月

Day-of-work:周中的天

Year:年

石英钟使用过程

创建JobDetail实例

注册调度器

调度触发器

计算时间触发时间

触发任务代码

执行job代码

工作原理

scheduler是一个计划调度器容器(总部),容器里面可以盛放众多的JobDetail和trigger,当容器启动后,里面的每个JobDetail都会根据trigger按部就班自动去执行。

JobDetail是一个可执行的工作,它本身可能是有状态的。

Trigger代表一个调度参数的配置,什么时候去调。

当JobDetail和Trigger在scheduler容器上注册后,形成了装配好的作业(JobDetail和Trigger所组成的一对儿),就可以伴随容器启动而调度执行了。

scheduler是个容器,容器中有一个线程池,用来并行调度执行每个作业,这样可以提高容器效率。

内部结构



将石英钟添加到京淘

代码逻辑

设置支付超时

判断是否超时:【当前的时间 - 创建的时间】结合【status的状态】

如果超时:归还库存

依赖

<!-- 石英钟任务 -->
<dependency>
<groupId>org.quartz-scheduler</groupId>
<artifactId>quartz</artifactId>
<version>2.2.1</version>
</dependency>


配置文件:applicationContext-scheduler.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd"> 
<!-- 定义任务bean -->
<bean name="paymentOrderJobDetail"
class="org.springframework.scheduling.quartz.JobDetailFactoryBean">
<!-- 指定具体的job类 -->
<property name="jobClass" value="com.peng.job.PayMentOrderJob" />
<!-- 指定job的名称 -->
<property name="name" value="paymentOrder" />
<!-- 指定job的分组 -->
<property name="group" value="Order" />
<!-- 必须设置为true,如果为false,当没有活动的触发器与之关联时会在调度器中删除该任务 -->
<property name="durability" value="true" />
<!-- 指定spring容器的key,如果不设定在job中的jobmap中是获取不到spring容器的 -->
<property name="applicationContextJobDataKey" value="applicationContext" />
</bean>

<!-- 定义触发器 -->
<bean id="cronTrigger"
class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">
<property name="jobDetail" ref="paymentOrderJobDetail" />
<!-- 每一天执行一次 -->
<property name="cronExpression" value="0 0 0/23 * * ?" />
</bean>

<!-- 定义调度器 -->
<bean class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers">
<list>
<ref bean="cronTrigger" />
</list>
</property>
</bean>

</beans>


java代码:PayMentOrderJob.java

package com.peng.job;

import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
import org.springframework.context.ApplicationContext;
import org.springframework.scheduling.quartz.QuartzJobBean;

public class PayMentOrderJob extends QuartzJobBean {

@Override
protected void executeInternal(JobExecutionContext context) throws JobExecutionException {
ApplicationContext applicationContrext = (ApplicationContext) context.getJobDetail().getJobDataMap()
.get("applicationContext");
System.err.println("定时任务执行中~~");
}
}


消息队列

RabbitMQ

引入

当前京淘的架构性能提升点

NGINX高并发

Redis内存缓存数据库(非结构数据)

amoeba提升数据最后关卡的性能

超负荷的请求,以上三个技术无法处理

当请求来到时,如果并发量太大,就让请求排成队列

基于erlang语言

消息队列分类

simple简单队列【先后顺序】

work工作模式【资源竞争】--红包

publish/subscribe发布订阅【共享资源】:引入交换机--邮件的群发、群聊天、广播

路由模式:消息的生产者发送给交换机,通过路由判断key值发送到相应的队列--error通知

topic主题模式(路由模式的一种):通过表达式进行判断--*代表多个单词,#号代表一个单词

注意:别名

publish:fanout

routing:direct

topic:topic

使用

依赖

<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.5.1</version>
</dependency>


流程

创建连接工厂

从连接工厂获取connection

从连接获取channel

从channel获取绑定的queue

生产者生产消息放入队列

释放资源

RabbitMQ的工作原理

单发送,单接收

使用场景:简单的发送与接收,没有设么特别的处理



示例【生产者】

public class Send {
private final static String QUEUE_NAME = "hello";//队列的名称

public static void main(String[] argv) throws Exception {
// 获取连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机IP
factory.setHost("localhost");
// 获取连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道找到队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
// 发送消息给队列
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
// 关闭连接
channel.close();
connection.close();
}
}


示例【消费者】

public class Recv {
// 队列名称
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
// 获得连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机IP
factory.setHost("localhost");
// 获得连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();
// 通道连接队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收队列
QueueingConsumer consumer = new QueueingConsumer(channel);
// 执行
channel.basicConsume(QUEUE_NAME, true, consumer);
// 遍历队列消息
while (true) {
// 传送队列信息
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}


单发送多接收

使用场景:一个发送端,多个接收端,如分布式的任务派发。为了保证消息发送的可靠性,不丢失消息,使消息持久化了。同时为了防止接收端在处理消息时down掉,只有在消息处理完成后才发送ack消息。



示例【生产者】

public class NewTask {
// 队列的名称
private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {
// 连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置主机IP
factory.setHost("localhost");
// 获取连接
Connection connection = factory.newConnection();
// 创建通道
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

String message = getMessage(argv);
// PERSISTENT_TEXT_PLAIN:消息的持久化
channel.basicPublish("", TASK_QUEUE_NAME,
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}

private static String getMessage(String[] strings){
if (strings.length < 1)
return "Hello World!";
return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}


示例【消费者】

public class Worker {

private static final String TASK_QUEUE_NAME = "task_queue";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 使用了channel.basicQos(1)保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端
channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");
doWork(message);
System.out.println(" [x] Done");

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}

private static void doWork(String task) throws InterruptedException {
for (char ch: task.toCharArray()) {
if (ch == '.') Thread.sleep(1000);
}
}
}


Publish/Subscribe

使用场景:发布、订阅模式,发送端发送广播消息,多个接收端接收



示例【生产者】

public class EmitLog {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

channel.close();
connection.close();
}

private static String getMessage(String[] strings){
if (strings.length < 1)
return "info: Hello World!";
return joinStrings(strings, " ");
}

private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0) return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}


示例【消费者】

public class ReceiveLogs {

private static final String EXCHANGE_NAME = "logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());

System.out.println(" [x] Received '" + message + "'");
}
}
}


Routing (按路线发送接收)

使用场景:发送端按routing key发送消息,不同的接收端按不同的routing key接收消息



示例【生产者】

public class EmitLogDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

String severity = getSeverity(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");

channel.close();
connection.close();
}

private static String getSeverity(String[] strings){
if (strings.length < 1)
return "info";
return strings[0];
}

private static String getMessage(String[] strings){
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}

private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0 ) return "";
if (length < startIndex ) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}


示例【消费者】

public class ReceiveLogsDirect {

private static final String EXCHANGE_NAME = "direct_logs";

public static void main(String[] argv) throws Exception {

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1){
System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]");
System.exit(1);
}

for(String severity : argv){
channel.queueBind(queueName, EXCHANGE_NAME, severity);
}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();

System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
}


Topics (按topic发送接收)

使用场景:发送端不只按固定的routing key发送消息,而是按字符串“匹配”发送,接收端同样如此。



示例【生产者】

public class EmitLogTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

connection = factory.newConnection();
channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");

String routingKey = getRouting(argv);
String message = getMessage(argv);

channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");

}
catch  (Exception e) {
e.printStackTrace();
}
finally {
if (connection != null) {
try {
connection.close();
}
catch (Exception ignore) {}
}
}
}

private static String getRouting(String[] strings){
if (strings.length < 1)
return "anonymous.info";
return strings[0];
}

private static String getMessage(String[] strings){
if (strings.length < 2)
return "Hello World!";
return joinStrings(strings, " ", 1);
}

private static String joinStrings(String[] strings, String delimiter, int startIndex) {
int length = strings.length;
if (length == 0 ) return "";
if (length < startIndex ) return "";
StringBuilder words = new StringBuilder(strings[startIndex]);
for (int i = startIndex + 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}


示例【消费者】

public class ReceiveLogsTopic {

private static final String EXCHANGE_NAME = "topic_logs";

public static void main(String[] argv) {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");

connection = factory.newConnection();
channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String queueName = channel.queueDeclare().getQueue();

if (argv.length < 1){
System.err.println("Usage: ReceiveLogsTopic [binding_key]...");
System.exit(1);
}

for(String bindingKey : argv){
channel.queueBind(queueName, EXCHANGE_NAME, bindingKey);
}

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);

while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();

System.out.println(" [x] Received '" + routingKey + "':'" + message + "'");
}
}
catch  (Exception e) {
e.printStackTrace();
}
finally {
if (connection != null) {
try {
connection.close();
}
catch (Exception ignore) {}
}
}
}
}


秒杀

业务场景分析

并发量很高的时间段--抢商品

队列中的消息可以是什么

电话号码

username

ticket

......

做法

调用SSO查询用户信息,把前n个消息获取到,后面的放入rabbitmq的垃圾桶

更高的并发可以考虑分布式的队列

文件位置

生产者:后台

消费者:前台

秒杀设计



未完待续。。。

注:参考文章

RabbitMQ的几种典型使用场景:https://www.rabbitmq.com/getstarted.html
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: