DataX学习笔记-Writer插件开发(续)
2016-06-01 18:08
381 查看
之前那篇笔记基于的DataX版本比较低,现换成git上最新版本的DataX重新开发基于将数据写入ElasticSearch的Writer插件
1、检出DataX源码(git clone https://github.com/alibaba/DataX.git DataX),导入项目,新建一个eswriter的maven项目进行插件开发。
2、在DataX安装目录的plugins/writer目录下新建eswriter目录,目录下包含plugin_job_template.json、plugin.json、eswriter-0.0.1-SNAPSHOT.jar,同时在目录下创建一个libs目录,存放相关依赖的jar文件。
相关代码:
package com.alibaba.datax.plugin.writer.eswriter;
public final class Key {
/*
* @name: esClusterName
* @description: elastic search cluster name
*/
public final static String esClusterName = "esClusterName";
/*
* @name: esClusterIP
* @description: elastic search cluster ip
*/
public final static String esClusterIP = "esClusterIP";
/*
* @name: esClusterPort
* @description: elastic search cluster port
*/
public final static String esClusterPort = "esClusterPort";
/*
* @name: esIndex
* @description: elastic search index
*/
public final static String esIndex = "esIndex";
/*
* @name: esType
* @description: elastic search type
*/
public final static String esType = "esType";
/*
* @name: attributeNameString
* @description: attribute name list
*/
public final static String attributeNameString = "attributeNameString";
/*
* @name: attributeNameSplit
* @description: separator to split attribute name string
*/
public final static String attributeNameSplit = "attributeNameSplit";
/*
* @name: className
* @description: qualified class name
*/
public final static String className = "className";
/*
* @name: batchSize
* @description: commit to elasticsearch batch size
*/
public final static String batchSize = "batchSize";
}
plugin_job_template.json
plugin.json
{
"name": "eswriter",
"class": "com.alibaba.datax.plugin.writer.eswriter.ESWriter",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport data to elasticsearch.",
"warn": ""
},
"developer": "wulin"
}
3、根据python bin/datax.py -r mysqlreader -w eswriter生成一个job/mysql_to_es.json文件,填写相关内容。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id,userid,name,phone"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.0.114:3306/student?useUnicode=true&characterEncoding=UTF-8"],
"table": ["info"]
}
],
"password": "123456",
"username": "test",
"where": "id < 10"
}
},
"writer": {
"name": "eswriter",
"parameter": {
"attributeNameSplit": ",",
"attributeNameString": "id,userid,name,phone",
"className": "com.alibaba.datax.plugin.writer.eswriter.Student",
"esClusterName": "elasticsearch",
"esClusterIP": "192.168.0.105,192.168.0.108",
"esClusterPort": "9300",
"esIndex": "user",
"esType": "student",
"batchSize": "1000"
}
}
}
],
"setting": {
"speed": {
"channel": "10"
}
}
}
}
4、执行python bin/datax.py
job/mysql_to_es.json
1、检出DataX源码(git clone https://github.com/alibaba/DataX.git DataX),导入项目,新建一个eswriter的maven项目进行插件开发。
2、在DataX安装目录的plugins/writer目录下新建eswriter目录,目录下包含plugin_job_template.json、plugin.json、eswriter-0.0.1-SNAPSHOT.jar,同时在目录下创建一个libs目录,存放相关依赖的jar文件。
相关代码:
package com.alibaba.datax.plugin.writer.eswriter; import java.lang.reflect.Field; import java.net.InetSocketAddress; import java.text.ParseException; import java.util.ArrayList; import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Map; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.google.gson.Gson; import com.umeng.es.config.EsServerAddress; public class ESWriter extends Writer { public static class Job extends Writer.Job { private Configuration originalConfiguration = null; @Override public void init() { this.originalConfiguration = super.getPluginJobConf(); } @Override public void prepare() { super.prepare(); } @Override public void preCheck() { super.preCheck(); } @Override public void preHandler(Configuration jobConfiguration) { super.preHandler(jobConfiguration); } @Override public void post() { super.post(); } @Override public void postHandler(Configuration jobConfiguration) { super.postHandler(jobConfiguration); } @Override public void destroy() { } @Override public List<Configuration> split(int mandatoryNumber) { List<Configuration> writerSplitConfiguration = new ArrayList<Configuration>(); for (int i = 0; i < mandatoryNumber; i++) { writerSplitConfiguration.add(this.originalConfiguration); } return writerSplitConfiguration; } } public static class Task extends Writer.Task { private Configuration writerSliceConfiguration = null; private String esClusterName = null; private String esClusterIP = null; private Integer esClusterPort = null; private String esIndex = null; private String esType = null; private String attributeNameString = null; private String attributeNameSplit = null; private String[] attributeNames = null; private String className = null; private Gson gson = null; private TransportClient client = null; private Integer batchSize = null; private static final Logger LOG = LoggerFactory.getLogger(Task.class); @Override public void init() { this.writerSliceConfiguration = super.getPluginJobConf(); this.esClusterName = writerSliceConfiguration.getString(Key.esClusterName); this.esClusterIP = writerSliceConfiguration.getString(Key.esClusterIP); this.esClusterPort = writerSliceConfiguration.getInt(Key.esClusterPort, 9300); this.esIndex = writerSliceConfiguration.getString(Key.esIndex); this.esType = writerSliceConfiguration.getString(Key.esType); this.attributeNameString = writerSliceConfiguration.getString(Key.attributeNameString); this.attributeNameSplit = writerSliceConfiguration.getString(Key.attributeNameSplit, ","); attributeNames = attributeNameString.split(attributeNameSplit); this.className = writerSliceConfiguration.getString(Key.className); this.batchSize = writerSliceConfiguration.getInt(Key.batchSize, 1000); this.gson = new Gson(); } @Override public void prepare() { super.prepare(); Settings settings = Settings.builder().put("cluster.name", esClusterName) .put("client.tansport.sniff", true).build(); client = TransportClient.builder().settings(settings).build(); List<EsServerAddress> serverAddress = new ArrayList<EsServerAddress>(); String[] esClusterIPs = esClusterIP.contains(",") ? esClusterIP.split(",") : new String[]{esClusterIP}; for (int i = 0, len = esClusterIPs.length; i < len; i++) { serverAddress.add(new EsServerAddress(esClusterIPs[i], esClusterPort)); } for (EsServerAddress address : serverAddress) { client.addTransportAddress(new InetSocketTransportAddress( new InetSocketAddress(address.getHost(), address.getPort()))); } } @Override public void preCheck() { super.preCheck(); } @Override public void preHandler(Configuration jobConfiguration) { super.preHandler(jobConfiguration); } @Override public void post() { super.post(); } @Override public void postHandler(Configuration jobConfiguration) { super.postHandler(jobConfiguration); } @Override public void destroy() { client.close(); } @Override public void startWrite(RecordReceiver lineReceiver) { List<Record> writerBuffer = new ArrayList<Record>(this.batchSize); Record record = null; while ((record = lineReceiver.getFromReader()) != null) { writerBuffer.add(record); if (writerBuffer.size() >= this.batchSize) { bulkSaveOrUpdateES(writerBuffer); writerBuffer.clear(); } } if (!writerBuffer.isEmpty()) { bulkSaveOrUpdateES(writerBuffer); writerBuffer.clear(); } } private void bulkSaveOrUpdateES(List<Record> writerBuffer) { Record record = null; Object object = null; Map<String, String> attributeValueMap = null; List<ESEntity> entities = new ArrayList<ESEntity>(); try { for (int w = 0, wlen = writerBuffer.size(); w < wlen; w++) { record = writerBuffer.get(w); object = Class.forName(className).newInstance(); int fieldNum = record.getColumnNumber(); if (null != record && fieldNum > 0) { attributeValueMap = new HashMap<String, String>(); for (int i = 0; i < fieldNum; i++) { attributeValueMap.put(attributeNames[i].toLowerCase(), record.getColumn(i).asString()); } for (Class<?> superClass = object.getClass(); superClass != Object.class; superClass = superClass.getSuperclass()) { Field[] fields = superClass.getDeclaredFields(); for (int i = 0, len = fields.length; i < len; i++) { Field field = fields[i]; String fieldNameLowerCase = field.getName().toLowerCase(); if (!attributeValueMap.containsKey(fieldNameLowerCase)) continue; String valueString = attributeValueMap.get(fieldNameLowerCase); Object value = convertValueByFieldType(field.getType(), valueString); if (field.isAccessible()) { field.set(object, value); } else { field.setAccessible(true); field.set(object, value); field.setAccessible(false); } } } entities.add((ESEntity) object); } } } catch (Exception e) { LOG.error(e.getMessage(), e); } bulkSaveOrUpdate(entities, esIndex, esType); } private void bulkSaveOrUpdate(List<ESEntity> entities, String database, String table) { if (null == entities || entities.isEmpty()) return; BulkRequestBuilder prepareBulk = client.prepareBulk(); for (ESEntity entity : entities) { IndexRequestBuilder irb = client.prepareIndex() .setIndex(database).setType(table).setId(entity.get_id()); entity.remove_id(); String source = gson.toJson(entity); irb.setSource(source); prepareBulk.add(irb); } prepareBulk.execute().actionGet(); } private Object convertValueByFieldType(Class<?> type, Object value) { Object finalValue = value; if (String.class.isAssignableFrom(type)) { finalValue = null == value ? "NA" : String.valueOf(value); } else if (Boolean.class.isAssignableFrom(type)) { finalValue = null == value ? Boolean.FALSE : Boolean.parseBoolean(String.valueOf(value)); } else if (Integer.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Integer.parseInt(String.valueOf(value)); } else if (Long.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Long.parseLong(String.valueOf(value)); } else if (Float.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Float.parseFloat(String.valueOf(value)); } else if (Double.class.isAssignableFrom(type)) { finalValue = null == value ? 0 : Double.parseDouble(String.valueOf(value)); } else if (Date.class.isAssignableFrom(type)) { try { value = null == value ? DateFormat.TIME.get().format(new Date()) : value; finalValue = DateFormat.TIME.get().parse(String.valueOf(value)); } catch (ParseException e) { LOG.error(e.getMessage(), e); } } return finalValue; } } }
package com.alibaba.datax.plugin.writer.eswriter;
public final class Key {
/*
* @name: esClusterName
* @description: elastic search cluster name
*/
public final static String esClusterName = "esClusterName";
/*
* @name: esClusterIP
* @description: elastic search cluster ip
*/
public final static String esClusterIP = "esClusterIP";
/*
* @name: esClusterPort
* @description: elastic search cluster port
*/
public final static String esClusterPort = "esClusterPort";
/*
* @name: esIndex
* @description: elastic search index
*/
public final static String esIndex = "esIndex";
/*
* @name: esType
* @description: elastic search type
*/
public final static String esType = "esType";
/*
* @name: attributeNameString
* @description: attribute name list
*/
public final static String attributeNameString = "attributeNameString";
/*
* @name: attributeNameSplit
* @description: separator to split attribute name string
*/
public final static String attributeNameSplit = "attributeNameSplit";
/*
* @name: className
* @description: qualified class name
*/
public final static String className = "className";
/*
* @name: batchSize
* @description: commit to elasticsearch batch size
*/
public final static String batchSize = "batchSize";
}
plugin_job_template.json
{ "name": "eswriter", "parameter": { "esClusterName": "", "esClusterIP": "192.168.0.1,192.168.0.2", "esClusterPort": "9300", "esIndex": "user", "esType": "student", "attributeNameString": "id,userid,name,phone", "attributeNameSplit": ",", "className": "com.alibaba.datax.plugin.writer.eswriter.Student", "batchSize": "1000" } }
plugin.json
{
"name": "eswriter",
"class": "com.alibaba.datax.plugin.writer.eswriter.ESWriter",
"description": {
"useScene": "only for developer test.",
"mechanism": "use datax framework to transport data to elasticsearch.",
"warn": ""
},
"developer": "wulin"
}
3、根据python bin/datax.py -r mysqlreader -w eswriter生成一个job/mysql_to_es.json文件,填写相关内容。
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"column": ["id,userid,name,phone"],
"connection": [
{
"jdbcUrl": ["jdbc:mysql://192.168.0.114:3306/student?useUnicode=true&characterEncoding=UTF-8"],
"table": ["info"]
}
],
"password": "123456",
"username": "test",
"where": "id < 10"
}
},
"writer": {
"name": "eswriter",
"parameter": {
"attributeNameSplit": ",",
"attributeNameString": "id,userid,name,phone",
"className": "com.alibaba.datax.plugin.writer.eswriter.Student",
"esClusterName": "elasticsearch",
"esClusterIP": "192.168.0.105,192.168.0.108",
"esClusterPort": "9300",
"esIndex": "user",
"esType": "student",
"batchSize": "1000"
}
}
}
],
"setting": {
"speed": {
"channel": "10"
}
}
}
}
4、执行python bin/datax.py
job/mysql_to_es.json
相关文章推荐
- 巧用mysql提示符prompt清晰管理数据库的方法
- 两大步骤教您开启MySQL 数据库远程登陆帐号的方法
- phpmyadmin 4+ 访问慢的解决方法
- linux系统下实现mysql热备份详细步骤(mysql主从复制)
- CentOS 5.5下安装MySQL 5.5全过程分享
- MySQL复制的概述、安装、故障、技巧、工具(火丁分享)
- MySQL中删除重复数据的简单方法
- 使用ElasticSearch6.0快速实现全文搜索功能的示例代码
- elasticsearch批量数据导入和导出
- 使用ElasticSearch+LogStash+Kibana+Redis搭建日志管理服务
- ElasticSearch 使用心得
- ES中如何使用逗号来分词
- ElasticSearch 守护进程 JSW
- Elasticsearch2.2.0数据操作
- Elasticsearch2.2.0安装ik中文分词
- Elasticsearch2.2.0安装pinyin插件
- ElasticSearch简单介绍
- ELK(ElasticSearch, Logstash, Kibana)搭建实时日志分析平台
- CENTOS安装ElasticSearch
- ElasticSearch的安装配置