SolrCloud-如何在.NET程序中使用
2016-07-09 16:56
399 查看
https://github.com/vladen/SolrNet
原来我们在我们的项目里用的是根据数据库路由到不同的单机Solr服务器,但是这样的话,每次Solr配置的修改都要修改三台不通的服务器,而且一台服务器挂了,必定会影响一部分用户不能使用搜索功能,而且还会造成一定程度的丢数据,所以我们换一种方式。
两种可选方案:
主从模式
SolrCloud
经过对比,决定用SolrCloud,SolrCloud的概念和优缺点,就不再赘述了,网上一搜一大堆,这里主要写一下在C#如何使用SolrCloud。
最早我们用
首先SolrCloud是通过zookeeper来调度的,那么我们就要先去zookeeper上面去load可用的节点,并且对 zookeeper的 clusterstate.json 文件做心跳检测,如果clusterstate.json 里的内容有变化,则说明节点状态有变化,需要重新load文件里的内容进行解析,否则就在应用程序第一次调用的时候加载一次,缓存起来。
献上核心代码
要先通过NuGet安装Zookeeper Client
在
在Impl文件夹里实现接口
这样我们如果是write操作,就会调用leader分片对应的节点进行写入,如果是查询操作,则尽量直接调用非leader来进行查询,如果非leader节点挂了,那么久从主节点进行查询。
原来我们在我们的项目里用的是根据数据库路由到不同的单机Solr服务器,但是这样的话,每次Solr配置的修改都要修改三台不通的服务器,而且一台服务器挂了,必定会影响一部分用户不能使用搜索功能,而且还会造成一定程度的丢数据,所以我们换一种方式。
两种可选方案:
主从模式
SolrCloud
经过对比,决定用SolrCloud,SolrCloud的概念和优缺点,就不再赘述了,网上一搜一大堆,这里主要写一下在C#如何使用SolrCloud。
最早我们用
EasyNet.Solr来进行Solr查询的,但是貌似
EasyNet.Solr没有对SolrCloud的查询做封装,所以就各种找资料,最后只能自己封装了。
首先SolrCloud是通过zookeeper来调度的,那么我们就要先去zookeeper上面去load可用的节点,并且对 zookeeper的 clusterstate.json 文件做心跳检测,如果clusterstate.json 里的内容有变化,则说明节点状态有变化,需要重新load文件里的内容进行解析,否则就在应用程序第一次调用的时候加载一次,缓存起来。
献上核心代码
要先通过NuGet安装Zookeeper Client
在
EasyNet.Solr根目录定义接口
ISolrCloudOperrations.cs
using System; using System.Collections.Generic; using System.Linq; using System.Text; namespace EasyNet.Solr { public interface ISolrCloudOperrations { string GetSolrCloudServer(string collectionName, bool isWrite); } }
在Impl文件夹里实现接口
SolrCloudOperations.cs
internal static class ZookeeperStatus { //每隔两秒钟pingzookeeper服务器,并监听solr服务器状态,如果状态有改变,更新状态文件,请求连接列表 private static void Ping(string zkHost) { //如果是第一次调用,则加载配置文件 if (DataLength == 0) { foreach (var host in zkHost.Split(',').ToList()) { Start(host); } Task.Factory.StartNew(() => { while (true) { foreach (var host in zkHost.Split(',').ToList()) { Task.Factory.StartNew(Start, host); } Thread.Sleep(2000); } }); } } private static object pingObj = new object(); private static void Start(object zkHost) { var watcher = new Watcher(); using (var zk = new ZooKeeper(zkHost.ToString(), new TimeSpan(0, 0, 0, 10000), watcher)) { var dataChange = watcher.WaitUntilWatched(); Org.Apache.Zookeeper.Data.Stat stat = null; try { stat = zk.Exists("/clusterstate.json", false); } finally { if (stat != null) { byte[] data = null; lock (pingObj) { if (DataLength == 0 || DataLength != stat.DataLength) { data = zk.GetData("/clusterstate.json", false, stat); DataLength = stat.DataLength; SetShard(data); } } } } } } private static int DataLength = 0; private static Dictionary<string, List<Shard>> shards = new Dictionary<string, System.Collections.Generic.List<Shard>>(); private static ReaderWriterLockSlim rw = new ReaderWriterLockSlim(); private static void SetShard(byte[] data) { var str = System.Text.Encoding.UTF8.GetString(data); JObject jObj = JsonConvert.DeserializeObject<JObject>(str); List<Shard> shardList = new List<Shard>(); try { rw.EnterWriteLock(); foreach (var p in jObj) { foreach (var item in p.Value["shards"]) { var jItem = item.First as JObject; if (jItem["state"].ToString() == "active") { foreach (var replica in jItem["replicas"]) { var jReplica = replica.First as JObject; if (jReplica["state"].ToString() == "active") { Shard shard = new Shard() { BaseUrl = jReplica["base_url"].ToString() }; if (jReplica["leader"] != null && "true" == jReplica["leader"].ToString()) { shard.Leader = true; } shardList.Add(shard); } } } } if (shards.ContainsKey(p.Key)) { shards[p.Key] = shardList; } else { shards.Add(p.Key, shardList); } } } finally { rw.ExitWriteLock(); } } public static string ZookeeperHost = "127.0.0.1:2181"; public static string GetCollection(string collectionName, bool isWrite) { ///第一次调用,则开始ping if (DataLength == 0) { Ping(ZookeeperHost); } IEnumerable<Shard> tempShardList = null; try { rw.EnterReadLock(); var shardList = shards[collectionName]; if (!isWrite) { tempShardList = shardList.Where(s => s.Leader == false); } //如果从库挂了,那么只能从主库读取了 if (tempShardList == null || tempShardList.Count() == 0) tempShardList = shardList.Where(s => s.Leader == true); } finally { rw.ExitReadLock(); } if (tempShardList == null) throw new Exception("no active shard"); //随机取值 int random = new Random().Next(tempShardList.Count() - 1); return tempShardList.ToList()[random].BaseUrl; } } internal class Shard { public string BaseUrl { get; set; } public bool Leader { get; set; } } internal enum Changed { None, Children, Data } internal class Watcher : IWatcher { private readonly ManualResetEventSlim _changed = new ManualResetEventSlim(false); private WatchedEvent _event; public Changed WaitUntilWatched() { _changed.Wait(); if (_event == null) throw new ApplicationException("bad state"); if (_event.State != KeeperState.SyncConnected) throw new ApplicationException("cannot connect"); if (_event.Type == EventType.NodeChildrenChanged) { return Changed.Children; } if (_event.Type == EventType.NodeDataChanged) { return Changed.Data; } return Changed.None; } void IWatcher.Process(WatchedEvent @event) { _event = @event; _changed.Set(); } }
这样我们如果是write操作,就会调用leader分片对应的节点进行写入,如果是查询操作,则尽量直接调用非leader来进行查询,如果非leader节点挂了,那么久从主节点进行查询。
相关文章推荐
- CentOS7安装Docker,运行Nginx镜像、Centos镜像
- linux 文件的atime,ctime,mtime
- 3Spring整合struts2
- POJ - 2230 Watchcow
- 2深入使用Spring
- 安卓中下载APK到本地中(罗传榕)
- Bootstrap轮播(carousel)插件中图片变形的终极解决方案——使用jqthumb.js
- POJ3463 Sightseeing(dijkstra求最短路+次短路)
- Android App 接入微信登录套路导航
- Android下同时使用WIFI与3G网络
- 嵌入式系统和嵌入式操作系统
- Python简介和安装
- poj1979
- Transition(过渡动画效果)
- 1使用Spring
- C以及C++语言中static作用
- 2016.07.09 offsetWidth 和一个问题。
- iOS 浅拷贝和深拷贝的区别? copy和mutableCopy的区别?
- hdu 5480(维护前缀和+思路题)
- 流媒体技术基础-组播