您的位置:首页 > 运维架构 > Apache

Apache Ignite 网格计算 (可用来取代dubbo等分布式RPC)与 spring 整合

2016-04-07 15:15 871 查看
使用apache ignite 已经一年多了 ,感觉除了他其他dubbo 分布式RPC 那些东西 我都没考虑过了 dubbo能做到 他同样能做到 而且性能 效率 速度 更快 更好,不愧是网格计算 而不是分布式服务。 而且该项目越来越完善 。越来越好。真是期待,网格计算作为一种很好的方式 我觉得将来必定取代传统的服务器 写编码代码方式 lvs 分布式rpc调用的方式。

我一直喜欢使用apache ignite 网格计算的优越性 大家也许不能深有体会,我使用了一年多了 1年半吧,怎么说呢,apache ignite 可以做到storm的实时云计算的效果,可以做到memcache redis缓存的效果,可以做到分布式负载 计算 等效果,最吸引人的地方是 能够更高性能的控制CPU 线程 内存,于是我抛弃了dubbo一类的分布式的RPC平台,性能比起网格计算来说 只是一个负载均衡 而已,不能充分利用CPU ,使用apache ignite 有什么好处呢,这里我举个例子,加入我一个接口 在正常一台服务器上执行需要1s 但是 你们传统的分布式RPC调用dubbo 或者 其他的LVS 通过DNS 或者其他反向代理负载均衡的 本质上还是调用到某一台服务器上,而apache ignite 能够保证每一次任务都是所有的服务器并行网格计算。所以2台服务器接口就只需要0.5s 3台可能只需要0.3s 服务器数目越多,最后真是0.00s就执行完了 ,我深有体会,使用了一年多来,10台服务器 apache ignite的计算能力 已经 远远比我storm速度还要快。这是真实体验。

如果你是做移动互联网app 开发 写接口 用apache ignite 架构吧。那个体验 。。。真是魅力无穷

1.POM添加依赖

<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-core</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-spring</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-indexing</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-ssh</artifactId>
<version>${ignite.version}</version>
</dependency>
<!-- <dependency> -->
<!-- <groupId>org.apache.ignite</groupId> -->
<!-- <artifactId>ignite-spark</artifactId> -->
<!-- <version>${ignite.version}</version> -->
<!-- </dependency> -->
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-zookeeper</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-slf4j</artifactId>
<version>${ignite.version}</version>
</dependency>
<dependency>
<groupId>org.apache.ignite</groupId>
<artifactId>ignite-tools</artifactId>
<version>${ignite.version}</version>
</dependency>

2.apache ignite 在spring中的配置 在发布环境请使用zookeeper进行P2P点对点连接。

<bean id="propertyConfigurer" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="systemPropertiesModeName" value="SYSTEM_PROPERTIES_MODE_FALLBACK"/>
<property name="searchSystemEnvironment" value="true"/>
</bean>
<!--
Abstract cache configuration for IGFS file data to be used as a template.
-->
<bean id="dataCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" abstract="true">
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
<property name="backups" value="0"/>
<property name="memoryMode" value="ONHEAP_TIERED"/>
<property name="offHeapMaxMemory" value="0"/>
<property name="affinityMapper">
<bean class="org.apache.ignite.igfs.IgfsGroupDataBlocksKeyMapper">
<!-- How many sequential blocks will be stored on the same node. -->
<constructor-arg value="512"/>
</bean>
</property>
</bean>

<!--
Abstract cache configuration for IGFS metadata to be used as a template.
-->
<bean id="metaCacheCfgBase" class="org.apache.ignite.configuration.CacheConfiguration" abstract="true">
<property name="memoryMode" value="ONHEAP_TIERED"/>
<property name="offHeapMaxMemory" value="0"/>
<property name="cacheMode" value="REPLICATED"/>
<property name="atomicityMode" value="TRANSACTIONAL"/>
<property name="writeSynchronizationMode" value="FULL_SYNC"/>
</bean>
<bean id="igfsCfgBase" class="org.apache.ignite.configuration.FileSystemConfiguration" abstract="true">
<!-- Must correlate with cache affinity mapper. -->
<property name="blockSize" value="#{128 * 1024}"/>
<property name="perNodeBatchSize" value="512"/>
<property name="perNodeParallelBatchCount" value="16"/>
<property name="prefetchBlocks" value="32"/>
</bean>
<bean id="igniteServerConfig" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="gridName" value="sparkdeep"></property>
<property name="peerClassLoadingEnabled" value="false" />
<!--  		<property name="marshaller">  -->
<!--  			<bean class="org.apache.ignite.marshaller.optimized.OptimizedMarshaller">  -->
<!--  				<property name="requireSerializable" value="false" />  -->
<!--  			</bean>  -->
<!--  		</property>  -->
<property name="marshaller">
<bean class="org.apache.ignite.internal.binary.BinaryMarshaller"/>
</property>
<property name="serviceConfiguration">
<list>
<bean class="org.apache.ignite.services.ServiceConfiguration">
<property name="name" value="sparkIgniteServiceNode"/>
<property name="maxPerNodeCount" value="1"/>
<property name="totalCount" value="0"/>
<property name="service">
<ref bean="sparkIgniteService"/>
</property>
</bean>
</list>
</property>
<property name="atomicConfiguration">
<bean class="org.apache.ignite.configuration.AtomicConfiguration">
<!-- Set number of backups. -->
<property name="backups" value="1"/>
<!-- Set number of sequence values to be reserved. -->
<property name="atomicSequenceReserveSize" value="5000"/>
</bean>
</property>
<property name="metricsLogFrequency" value="0" />
<property name="gridLogger">
<bean class="org.apache.ignite.logger.slf4j.Slf4jLogger" />
</property>
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="forceServerMode" value="true"></property>
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
</bean>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Cache mode. -->
<property name="name" value="cacheservice"/>
<!-- Set initial cache capacity to ~ 100M. -->
<property name="startSize" value="#{100 * 1024 * 1024}"/>
<property name="cacheMode" value="PARTITIONED" />
<property name="evictionPolicy">
<!-- LRU eviction policy. -->
<bean class="org.apache.ignite.cache.eviction.lru.LruEvictionPolicy">
<!-- Set the maximum cache size to 1 million (default is 100,000). -->
<property name="maxSize" value="1000000"/>
</bean>
</property>
<property name="memoryMode" value="ONHEAP_TIERED"/>
<!-- This shows how to configure number of backups. The below configuration
sets the number of backups to 1 (which is default). -->
<property name="backups" value="0"/>
<property name="offHeapMaxMemory" value="0"/>
<property name="rebalanceBatchSize" value="#{1024 * 1024}"/>
<!-- Explicitly disable rebalance throttling. -->
<property name="rebalanceThrottle" value="0"/>
<!-- Set 4 threads for rebalancing. -->
<property name="rebalanceThreadPoolSize" value="4"/>
</bean>
<!-- File system metadata cache. -->
<bean class="org.apache.ignite.configuration.CacheConfiguration" parent="metaCacheCfgBase">
<property name="name" value="igfs-meta"/>
</bean>

<!-- File system files data cache. -->
<bean class="org.apache.ignite.configuration.CacheConfiguration" parent="dataCacheCfgBase">
<property name="name" value="igfs-data"/>
</bean>
</list>
</property>
<property name="fileSystemConfiguration">
<list>
<bean class="org.apache.ignite.configuration.FileSystemConfiguration" parent="igfsCfgBase">
<property name="name" value="imagefs"/>
<!-- Caches with these names must be configured. -->
<property name="metaCacheName" value="igfs-meta"/>
<property name="dataCacheName" value="igfs-data"/>
<!-- Configure TCP endpoint for communication with the file system instance. -->
<property name="ipcEndpointConfiguration">
<bean class="org.apache.ignite.igfs.IgfsIpcEndpointConfiguration">
<property name="type" value="TCP" />
<property name="host" value="0.0.0.0" />
<property name="port" value="10500" />
</bean>
</property>
</bean>
</list>
</property>
</bean>
<bean id="igniteSpringBean" class="org.apache.ignite.IgniteSpringBean" >
<property name="configuration">
<ref bean="igniteServerConfig"/>
</property>
</bean>
<bean id="igniteClientConfig" class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="clientMode" value="true" />
<property name="gridName" value="imageSdkClient"></property>
<property name="peerClassLoadingEnabled" value="false" />
<!-- Uncomment to provide custom configuration for executor service.
By default thread pool size is 100. All threads are pre-started and are available
for use. -->
<property name="publicThreadPoolSize" value="400" />
<!-- Uncomment to provide custom configuration for System executor service.
By default the thread pool size is 5 which should be good enough. Threads
are not started unless used. -->
<property name="systemThreadPoolSize" value="400" />
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean
class="org.apache.ignite.spi.discovery.tcp.ipfinder.jdbc.TcpDiscoveryJdbcIpFinder">
<property name="dataSource" ref="dataSource" />
</bean>
</property>
</bean>
</property>
</bean>
<bean id="sparkIgniteService" class="com.zsuper.ignite.service.impl.SparkIgniteServiceImpl">

</bean>

3.服务的部署

public static <T> T createIgniteService(Ignite ignite,String serviceName,Class<T> serviceClass,boolean stick){

return ignite.services().serviceProxy(serviceName, serviceClass, false);
}

使用

@Resource(name = "igniteSpringBean")
protected Ignite ignite;

SparkIgniteService service=ServiceGenerator.createIgniteService(ignite,"sparkIgniteServiceNode",SparkIgniteService.class, false);
JavaSparkContext context=service.getSparkContext();

分布式计算

public IgniteCompute getIgniteCompute(){

return getIgnite().compute();
}

public ExecutorService getExecutorService(){

return getIgnite().executorService();
}


public class FileBlockDownTask implements IgniteCallable<FileBlock>{

private long startPos;
private long endPos;
private String key;
private int blockCount;
private IgniteAtomicLong count;
public FileBlockDownTask(long startPos, long endPos,String key,int blockCount,IgniteAtomicLong atomic) {
super();
this.startPos = startPos;
this.endPos = endPos;
this.key=key;
this.blockCount=blockCount;
this.count=atomic;
}
public long getStartPos() {
return startPos;
}
public void setStartPos(long startPos) {
this.startPos = startPos;
}
public long getEndPos() {
return endPos;
}
public void setEndPos(long endPos) {
this.endPos = endPos;
}
public String getKey() {
return key;
}
public void setKey(String key) {
this.key = key;
}

public int getBlockCount() {
return blockCount;
}
public void setBlockCount(int blockCount) {
this.blockCount = blockCount;
}
@Override
public FileBlock call() throws Exception {
GetObjectRequest objectRequest = new GetObjectRequest(AliyunOSSUtils.DEFAULTBUCKET, key);
objectRequest.setRange(startPos, endPos);
OSSObject object = AliyunOSSUtils.getInstance().getObject(objectRequest);
InputStream objectContent = object.getObjectContent();
ByteArrayOutputStream output=new ByteArrayOutputStream();
IOUtils.copy(objectContent, output);
objectContent.close();
long c=count.incrementAndGet();
FileBlock block=new FileBlock();
block.setStart(startPos);
block.setEnd(endPos);
block.setBlockCount(blockCount);
block.setDatas(output.toByteArray());
block.setCount(c);
return block;
}

}


public class IgniteRunTask implements IgniteRunnable{

private File file;

public IgniteRunTask(File file) {
super();
this.file = file;
}

@Override
public void run() {
//xxxx
}

}

分布式mapreduce运算

/**
* 用于进行图片比较
* @author zhuyuping
* 2016年1月28日
*/
public class ImageMatchComputerJob extends ComputeTaskSplitAdapter<List<ImageMatchInfo>,String>{

private QueryRunner run;

private String sql="select path from image_db where id=? and imgidx=? limit 1";

public ImageMatchComputerJob() {
super();
run=new QueryRunner(DBHelper.getDateSource());
}

@Override
public String reduce(List<ComputeJobResult> results) throws IgniteException {
//选取得分最低的
float min=1.0f;
String minid=null;
for (ComputeJobResult computeJobResult : results) {
String rs=computeJobResult.getData();
if(rs!=null){
String[] strs=rs.split(":");
float s=Float.valueOf(strs[0]);
String id=strs[1];
if(s<min){
min=s;
minid=id;
}
}
}
return minid;
}

@Override
protected Collection<? extends ComputeJob> split(int gridSize, List<ImageMatchInfo> ids)
throws IgniteException {
//分割汇总得到score
List<ComputeJob> jobs=Lists.newArrayList();
for (ImageMatchInfo imageMatchInfo : ids) {
jobs.add(new ComputeJob() {

@Override
public Object execute() throws IgniteException {
try {
String id=imageMatchInfo.getImageid();
String idxid=imageMatchInfo.getIdxid();
BufferedImage image=imageMatchInfo.getImage();
String path=run.query(sql, new ScalarHandler<String>(1),id,idxid);
BufferedImage search=ImageIO.read(AliyunOSSUtils.getObject(path));
float s=ImageSurf.compare(search, image);
return s+":"+id;
} catch (Exception e) {
return null;
}

}

@Override
public void cancel() {

}
});
}

return jobs;
}

}

使用教程

public Long indexAsync(double[] qu,Long id) throws SQLException{
byte[] quantised=ignite.compute().execute(KNNIndexComputeTaskSplitAdapter.class, knndatas);
//写入表中
Map<String,ColumnValue> maps=Maps.newHashMap();
maps.put("vladindex", ColumnValue.fromBinary(quantised));
TableStoreUtils.putRow(ImageSearchConsist.tableName, uid, id, maps);
return id;
}

public void searchAsync(double[] qu,int k) throws SQLException{
double[][] distance=ignite.compute().execute(KNNSearchComputeTaskSplitAdapter.class, knndatas);
int nodes=ignite.cluster().forServers().nodes().size();
//long size=ImageSearchIndexMetaUtils.countIndexSize(uid);
List<Long> ids=ImageSearchIndexMetaUtils.getIdLists(uid);
//long page=(size+nodes-1)/nodes;
final BoundedPriorityQueue<IntDoublePair> queue =
new BoundedPriorityQueue<IntDoublePair>(k, IntDoublePair.SECOND_ITEM_ASCENDING_COMPARATOR);
//int min, int max, List<byte[]> data,
//double[][] distances, int knnsize,
//BoundedPriorityQueue<IntDoublePair> queue
List<ImageIndexSearchRunnable3> tasks=Lists.newArrayList();
List<List<Long>> idlists=Lists.partition(ids, nodes);
for (List<Long> list : idlists) {
tasks.add(new ImageIndexSearchRunnable3(list, distance, knndatas.size(),k,ignite,uid));
}
ignite.compute().run(tasks);
CollectionConfiguration colCfg = new CollectionConfiguration();
colCfg.setCollocated(true);
final IgniteQueue<LongDoublePair> resultqueue = ignite.queue(uid+"imagesearch-queue", k, colCfg);
LongDoublePair pair=null;
List<ImageSurfCompareTask> compareTasks=Lists.newArrayList();
while((pair=resultqueue.poll())!=null){
}

分布式队列

CollectionConfiguration colCfg = new CollectionConfiguration();
colCfg.setCollocated(true);
final IgniteQueue<LongDoublePair> resultqueue = ignite.queue(uid+"imagesearch-queue", k, colCfg);

分布式内存文件系统 分布式缓存 另外也支持消息订阅 CEP spark 各类数据结构 上面我用的比较多。

fs=ignite.fileSystem("imagefs");
cache=ignite.cache("cacheservice");

IgfsPath vlad=new IgfsPath("/vlad");
fs.mkdirs(vlad);
IgfsOutputStream out=fs.create(new IgfsPath(vlad,"imageSDKVladIndex.index"), true);
File file=ResourceUtils.getFile("classpath:vladindexnew.index");
IOUtils.write(Files.toByteArray(file), out);
index=VLADIndexerUtils.read(file);
cache.put("imageSDKVladIndex", index);


启动运行图片



下一章我们讲讲 caffe深度学习 实战教程

运行启动
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  apache ignite dubbo spring