您的位置:首页 > 其它

1 程序后台数据收集模块编写

2017-03-04 15:06 246 查看

JavaSDK设计规则

JavaSDK提供两个事件触发方法,分别为onChargeSuccess(支付成功触发事件,程序主动调用)和onChargeRefund(支付失败触发事件,程序主动调用)。我们在java sdk中通过一个单独的线程来发送收集数据,这样可以减少对业务系统的延时性。

开发环境

Linux + Windows开发

IDE:eclipse JavaEE

实现

1. AnalyticsEngineSDK — 分析引擎sdk Java服务器端数据收集,主要收集后台日志

package com.neu.ae.sdk;

import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* 分析引擎sdk Java服务器端数据收集,主要收集后台日志
*
* @author Administrator
*
*/
public class AnalyticsEngineSDK {
// 日志打印
private static final Logger log = Logger.getGlobal();
// 请求的url的主体部分
private static final String accessUrl = "http://localhost/analytics/__bfimg.gif";
private static final String platformName = "java_server";
private static final String sdkName = "jdk";
private static final String version = "1";

/**
* 触发订单支付成功事件,发送事件数据到服务器
*
* @param orderId
* 订单支付id
* @param memberId
* 订单支付会员id
* @return 如果发送数据成功(加入到发送队列中),返回true;否则返回false(参数异常&添加到发送队列失败)
* @throws UnsupportedEncodingException
*/
public static boolean onChargeSuccess(String orderId, String memberId) {
// 业务方不处理异常,出异常业务也不能停下
try {
if (isEmpty(orderId) || isEmpty(memberId)) {
// 如果订单id或会员id为空
log.log(Level.WARNING, "订单id和会员id不能为空");
return false;
}
// 代码执行到这,订单id和会员id不为空
Map<String, String> data = new HashMap<>(); // 构建数据散列映射表
// 按照需求分析设置数据
data.put("u_id", memberId);
data.put("oid", orderId);
data.put("c_time", String.valueOf(System.currentTimeMillis()));
data.put("ver", version);
data.put("en", "e_cs"); // 时间 event charge success
data.put("pl", platformName);
data.put("sdk", sdkName);

// 创建url
String url = buildUrl(data);
// 发送url,将url添加到队列中去
SendDataMonitor.addSendUrl(url);
return true;
} catch (Throwable e) {
// 打印异常
log.log(Level.WARNING, "发送数据异常", e);
}
return false;
}

/**
* 触发订单退款事件,发送退款数据到服务器
*
* @param orderId
* 退款订单id
* @param memberId
* 退款会员id
* @return 如果发送数据成功,返回true,否则返回false。
*/
public static boolean onChargeRefund(String orderId, String memberId) {
try {
if (isEmpty(orderId) || isEmpty(memberId)) {
// 如果订单id或会员id为空
log.log(Level.WARNING, "订单id和会员id不能为空");
return false;
}
// 代码执行到这,订单id和会员id不为空
Map<String, String> data = new HashMap<>(); // 构建数据散列映射表
// 严格按照需求分析设置数据
data.put("u_id", memberId);
data.put("oid", orderId);
data.put("c_time", String.valueOf(System.currentTimeMillis()));
data.put("ver", version);
data.put("en", "e_cr"); // 时间 event charge refound
data.put("pl", platformName);
data.put("sdk", sdkName);

// 创建url
String url = buildUrl(data);
// 发送url,将url添加到队列中去
SendDataMonitor.addSendUrl(url);
return true;
} catch (Throwable e) {
log.log(Level.WARNING, "发送数据异常", e);
}
return false;
}

/**
* 根据传输的Map参数构建url == 主体 + 参数
*
* @param data
* @return
* @throws UnsupportedEncodingException
*/
private static String buildUrl(Map<String, String> data) throws UnsupportedEncodingException {
StringBuilder sb = new StringBuilder();
sb.append(accessUrl).append("?");
for (Map.Entry<String, String> entry : data.entrySet()) {
if (isNotEmpty(entry.getKey()) && isNotEmpty(entry.getValue())) {
// 如果entry对象键、值都不为空
// 添加对象键值,trim去掉空白,值需要进行utf-8编码
sb.append(entry.getKey().trim()).append(URLEncoder.encode(entry.getValue().trim(), "utf-8"))
.append("&");
}
}
// accessUrl?c_time=xxx&u_mid=xxx&
return sb.substring(0, sb.length() - 1); // 删掉最后的&
}

/**
* 判断字符串是否为空,如果为空,返回true。否则返回false
*
* @param value
* @return
*/
private static boolean isEmpty(String value) {
return value == null || value.trim().isEmpty();
}

/**
* 判断字符串是否非空,非空返回true,空返回false
*
* @param value
* @return
*/
private static boolean isNotEmpty(String value) {
return !isEmpty(value);
}
}


2.SendDataMonitor — 发送url数据的监控者,用于启动一个单独的线程来发送数据

package com.neu.ae.sdk;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* 发送url数据的监控者,用于启动一个单独的线程来发送数据
*
* @author Administrator
*
*/
public class SendDataMonitor {
// 日志记录对象
private static final Logger log = Logger.getGlobal();
// 队列,用于存储发送url,存放 onChargeSuccess 和 onChargeRefound 方法的生成的url
private BlockingQueue<String> queue = new LinkedBlockingQueue<>();
// 用户单例的一个类对象
private static SendDataMonitor monitor = null;

private SendDataMonitor() {
// 私有构造方法,进行单例模式的创建
}

/**
* 获取单例的monitor对象实例
*
* @return
*/
public static SendDataMonitor getSendDataMonitor() {
if (monitor == null) {
// 同步代码块时,一个时间只能有一个线程得到执行
synchronized (SendDataMonitor.class) {
if (monitor == null) {
monitor = new SendDataMonitor();

Thread thread = new Thread(new Runnable() {

@Override
public void run() {
// 线程中调用具体的处理方法
SendDataMonitor.monitor.run();
}
});
// 测试的时候,不设置为守护模式
// thread.setDaemon(true);
thread.start();
}
}
}
return monitor;
}

/**
* 添加一个url到队列中去
*
* @param url
* @throws InterruptedException
*/
public static void addSendUrl(String url) throws InterruptedException {
getSendDataMonitor().queue.put(url);
}

/**
* 具体执行发送url的方法
*/
private void run() {
while (true) {
try {
// 监听着队列,有数据进来就发送
String url = this.queue.take();
// 正式发送url
HttpRequestUtil.sendData(url);
} catch (Throwable e) {
log.log(Level.WARNING, "发送url异常", e);
}
}
}

/**
* 内部类,用于发送数据的http工具类
*
* @author Administrator
*
*/
public static class HttpRequestUtil {
/**
* 具体发送url的方法
*
* @param url
* @throws MalformedURLException
*/
public static void sendData(String url) throws IOException {
HttpURLConnection con = null;
BufferedReader in = null;
try {
URL obj = new URL(url); // 创建url对象
con = (HttpURLConnection) obj.openConnection(); // 打开url连接
// 设置连接参数
con.setConnectTimeout(5000); // 连接过期时间 ms
con.setReadTimeout(5000); // 读取数据过期时间 ms
con.setRequestMethod("GET"); // 设置请求类型为GET

System.out.println("发送url:" + url);
// 发送连接请求
// con.connect();  使用连接语句不能实现数据同步
in = new BufferedReader(new InputStreamReader(con.getInputStream()));
} finally {
try {
if (in != null)
in.close();
} catch (Throwable e) {
// nothing
}
try {
con.disconnect();
} catch (Throwable e) {
// nothing
}
}
}
}
}


3. 测试类–Test

import com.neu.ae.sdk.AnalyticsEngineSDK;
/**
* 测试 Java sdk发送数据
* @author Administrator
*
*/
public class Test {
public static void main(String[] args) {
AnalyticsEngineSDK.onChargeSuccess("orderid123", "lzp123");
AnalyticsEngineSDK.onChargeRefund("orderid456", "lzp456");
}
}


输出:

输出
发送url:http://localhost/analytics/__bfimg.gif?c_time1488601498953&oidorderid123&pljava_server&ene_cs&sdkjdk&ver1&u_idlzp123


集群SDK检测

启动集群上的hdfs+nginx+flume进程,通过模拟数据的发送然后将数据发送到nginx服务器中,查看最终是否在hdfs中有数据的写入。

命令:

start-dfs.sh: 启动hdfs命令

su root:切换用户

service nginx restart: 启动nginx进程

启动flume进程:

进入flume安装根目录,执行命令:flume-ng agent –conf ./conf/ –conf-file ./conf/test2.conf –name agent &

结果:

ava 控制台运行

发出两条测试数据(付款成功 、付款成功)

192.168.175.1^A1488603777.646^A192.168.175.110^A/BfImg.gif?c_time1488603049604&oidorderid123&pljava_server&ene_cs&sdkjdk&ver1&u_idlzp123
192.168.175.1^A1488603777.654^A192.168.175.110^A/BfImg.gif?c_time1488603049606&oidorderid456&pljava_server&ene_cr&sdkjdk&ver1&u_idlzp456


系统日志:tail -f ~/access.log 和 HDFS上的日志logs/3/4下同步更新

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: