故如虹,知恩;故如月,知明
排名
6
文章
6
粉丝
16
评论
8
{{item.articleTitle}}
{{item.blogName}} : {{item.content}}
ICP备案 :渝ICP备18016597号-1
网站信息:2018-2024TNBLOG.NET
技术交流:群号656732739
联系我们:contact@tnblog.net
欢迎加群交流技术

.NET Core使用ElasticSearch 三:常用帮助类封装

5359人阅读 2021/7/14 17:18 总访问:3913695 评论:3 收藏:1 手机
分类: .NET Core

把.NET CORE操作ElasticSearch的常用方法封装成帮助类,包含查询的封装,操作索引相关的封装,删除,更新等等等。基于ES7.12.0,以前早一点版本的写法有些用法都不一样了,重新封装一下。
写了我好久

namespace ElasticsearchLearn.ElasticSearch
{
    public static class ElasticsearchExtands
    {
        /// <summary>
        /// 封装后的linq的查询方式
        /// </summary>
        /// <typeparam name="T">实体泛型</typeparam>
        /// <param name="indexName">index的名称</param>
        /// <param name="selector">linq内容</param>
        /// <returns></returns>
        public static async Task<List<T>> SearchByLinqAsync<T>(this IElasticSearchServer elasticSearchServer, string indexName, Func<QueryContainerDescriptor<T>, QueryContainer> selector = null) where T : class
        {
            var list = await elasticSearchServer.ElasticLinqClient.SearchAsync<T>(option => option.Index(indexName.ToLower()).Query(selector));
            return list.Documents.ToList();
        }

        /// <summary>
        /// 封装后的Json的查询方式
        /// </summary>
        /// <param name="indexName">index的名称</param>
        /// <param name="jsonString">json字符串</param>
        /// <returns>返回Jobject的内容</returns>
        public static async Task<JToken> SearchByJsonAsync(this IElasticSearchServer elasticSearchServer, string indexName, string jsonString)
        {
            var stringRespones = await elasticSearchServer.ElasticJsonClient.SearchAsync<StringResponse>(indexName.ToLower(), jsonString);
            var jobject = JObject.Parse(stringRespones.Body);
            var total = Convert.ToInt32(jobject["hits"]["total"]["value"].ToString());
            if (total > 0)
            {
                string json = string.Empty;
                var sourceArg = jobject["hits"]["hits"];
                foreach (var source in sourceArg)
                {
                    string sourceJson = source["_source"].ToString().Substring(1, source["_source"].ToString().Length - 1);
                    sourceJson = "{ \"_id\":\"" + source["_id"] + "\"," + sourceJson;
                    if (json.Length <= 0)
                        json += sourceJson;
                    else
                        json += "," + sourceJson;


                }
                return JToken.Parse("[" + json + "]");
            }
            return null;
        }

        /// <summary>
        /// 通过索引与id检查文档是否已经存在
        /// </summary>
        /// <param name="index">索引</param>
        /// <param name="id">id</param>
        /// <returns></returns>
        public static async Task<bool> SourceExistsAsync(this IElasticSearchServer elasticSearchServer, string index, string id)
        {
            bool flag = false;
            StringResponse resStr = null;
            try
            {
                //elasticSearchServer.ElasticJsonClient.Indices.Exists()
                resStr = await elasticSearchServer.ElasticJsonClient.SourceExistsAsync<StringResponse>(index, id);
                if (resStr.HttpStatusCode == 200)
                {
                    flag = true;
                }
            }
            catch (Exception ex)
            {
            }
            return flag;
        }

        /// <summary>
        /// 检测索引是否已经存在
        /// </summary>
        /// <param name="index"></param>
        /// <returns></returns>
        public static async Task<bool> IsIndexExsit(this IElasticSearchServer elasticSearchServer, string index)
        {
            bool flag = false;
            StringResponse resStr = null;
            try
            {
                resStr = await elasticSearchServer.ElasticJsonClient.Indices.ExistsAsync<StringResponse>(index);
                if (resStr.HttpStatusCode == 200)
                {
                    flag = true;
                }
            }
            catch (Exception ex)
            {
            }
            return flag;
        }
        
        /// <summary>
        /// 创建index
        /// </summary>
        /// <param name="indexName"></param>
        /// <param name="shards">分片数量,即数据块最小单元</param>
        /// <returns></returns>
        public static async Task<bool> CreateIndexAsync(this IElasticSearchServer elasticSearchServer, string indexName, int shards = 5)
        {
            var isHaveIndex = await IsIndexExsit(elasticSearchServer, indexName.ToLower());
            if (!isHaveIndex)
            {
                var stringResponse = await elasticSearchServer.ElasticJsonClient.Indices.CreateAsync<StringResponse>(indexName.ToLower(),
                        PostData.String($"{{\"settings\" : {{\"index\" : {{\"number_of_replicas\" : 0, \"number_of_shards\":\"{shards}\",\"refresh_interval\":\"-1\"}}}}}}"));
                var resObj = JObject.Parse(stringResponse.Body);
                if ((bool)resObj["acknowledged"])
                {
                    return true;
                }
            }
            else
            {
                return true;
            }
            return false;
        }


        /// <summary>
        /// 删除index
        /// </summary>
        /// <param name="indexName"></param>
        /// <returns></returns>
        public static async Task<bool> DeleteIndexAsync(this IElasticSearchServer elasticSearchServer, string indexName)
        {
            var stringRespones = await elasticSearchServer.ElasticJsonClient.Indices.DeleteAsync<StringResponse>(indexName.ToLower());
            var resObj = JObject.Parse(stringRespones.Body);
            if ((bool)resObj["acknowledged"])
            {
                return true;
            }
            return false;
        }

        /// <summary>
        /// 插入单个文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="objectDocment">文档内容</param>
        /// <param name="_id">自定义_id</param>
        /// <returns></returns>
        public static async Task<bool> InsertDocumentAsync(this IElasticSearchServer elasticSearchServer, string indexName, object objectDocment, string _id = "")
        {
            var stringRespones = new StringResponse();
            if (_id.Length > 0)
                stringRespones = await elasticSearchServer.ElasticJsonClient.IndexAsync<StringResponse>(indexName.ToLower(), _id, PostData.String(JsonConvert.SerializeObject(objectDocment)));
            else
                stringRespones = await elasticSearchServer.ElasticJsonClient.IndexAsync<StringResponse>(indexName.ToLower(), PostData.String(JsonConvert.SerializeObject(objectDocment)));
            var resObj = JObject.Parse(stringRespones.Body);
            if ((int)resObj["_shards"]["successful"] > 0)
            {
                return true;
            }
            return false;
        }


        /// <summary>
        /// 删除单个文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="_id">要删除的id</param>
        /// <returns></returns>
        public static async Task<bool> DeleteDocumentAsync(this IElasticSearchServer elasticSearchServer, string indexName, string _id)
        {
            bool flag = false;
            StringResponse resStr = null;
            try
            {
                resStr = await elasticSearchServer.ElasticJsonClient.DeleteAsync<StringResponse>(indexName.ToLower(), _id);
                var resObj = JObject.Parse(resStr.Body);
                if ((int)resObj["_shards"]["total"] == 0 || (int)resObj["_shards"]["successful"] > 0)
                {
                    flag = true;
                }
            }
            catch (Exception ex)
            {
            }

            return flag;
        }

        /// <summary>
        /// 更新文档  
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="_id">文档id</param>
        /// <param name="objectDocment">文档内容</param>
        /// <returns></returns>
        public static async Task<bool> UpdateDocumentAsync(this IElasticSearchServer elasticSearchServer, string indexName, string id, object objectDocment)
        {
            bool flag = false;
            try
            {

                string json = JsonConvert.SerializeObject(objectDocment);

                var updateToJson = "{\"doc\":" + json + "}";

                var stringRespones = await elasticSearchServer.ElasticJsonClient.UpdateAsync<StringResponse>(indexName, id, PostData.String(updateToJson));
                var resObj = JObject.Parse(stringRespones.Body);
                if ((int)resObj["_shards"]["successful"] > 0)
                {
                    return true;
                }
            }
            catch { }
            return flag;
        }

        /// <summary>
        /// 通过Bulk更新文档  
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="_id">文档id</param>
        /// <param name="objectDocment">文档内容</param>
        /// <returns></returns>
        public static async Task<bool> UpdateDocumentByBulkAsync(this IElasticSearchServer elasticSearchServer, string indexName, string _id, object objectDocment)
        {
            bool flag = false;
            try
            {
                string json = JsonConvert.SerializeObject(objectDocment);
                if (json.IndexOf("[") == 0)
                {
                    var objectDocmentOne = JToken.Parse(json);
                    json = JsonConvert.SerializeObject(objectDocmentOne[0]);
                }
                int idInt = json.IndexOf("\"_id");
                if (idInt > 0)
                {
                    string idJson = json.Substring(idInt, json.IndexOf(_id) + _id.Length + 1);
                    json = json.Replace(idJson, "");
                }
                List<string> list = new List<string>();
                list.Add("{\"update\":{\"_id\":\"" + _id + "\"}}");
                list.Add("{\"doc\":" + json + "}");

                var stringRespones = await elasticSearchServer.ElasticJsonClient.BulkAsync<StringResponse>(indexName.ToLower(), PostData.MultiJson(list));
                var resObj = JObject.Parse(stringRespones.Body);
                if (!(bool)resObj["errors"])
                {
                    return true;
                }
            }
            catch { }
            return flag;
        }
    }
}

在控制器中测试的代码如下:

namespace ElasticsearchLearn.Controllers
{
    public class HomeController : Controller
    {
        private readonly ILogger<HomeController> _logger;
        private readonly IElasticSearchServer _elasticSearchServer;

        public HomeController(ILogger<HomeController> logger, IElasticSearchServer elasticSearchServer)
        {
            _logger = logger;
            _elasticSearchServer = elasticSearchServer;
        }

        public IActionResult Index()
        {
            return View();
        }

        /// <summary>
        /// 基于Linq的查询
        /// </summary>
        public async Task<List<Persons>> SearchByLinq(string user = "批量")
        {
            var list = await _elasticSearchServer.SearchByLinqAsync<Persons>("users", op => op.Match(
                                           ss => ss.Field(
                                               qq => qq.user == user)));

            return list;
        }

        /// <summary>
        /// 基于Json的查询
        /// </summary>
        public async Task<string> SearchByJson(string user = "批量")
        {

            var jsonobject = new { query = new { match = new { user = "批量" } } };
            string json = JsonConvert.SerializeObject(jsonobject);

            var jToken = await _elasticSearchServer.SearchByJsonAsync("users", json);

            return jToken.ToString();
        }

        /// <summary>
        /// 插入单个文档
        /// </summary>
        /// <param name="indexName">索引名称</param>
        /// <param name="objectDocment">文档内容</param>
        /// <param name="_id">自定义_id</param>
        /// <returns></returns>
        public async Task<bool> InsertDocument()
        {
            var content = new
            {
                user = "嘻嘻",
                post_date = "2021-10-11T15:00:12",
                message = "...."
            };

            bool result = await _elasticSearchServer.InsertDocumentAsync("users", content, "1");
            return result;
        }

        /// <summary>
        /// 删除单个文档
        /// </summary>
        /// <param name="index"></param>
        /// <param name="id"></param>
        /// <returns></returns>
        public async Task<bool> DeleteDocument(string index = "users", string id = "2")
        {
            bool result = await _elasticSearchServer.DeleteDocumentAsync(index, id);
            return result;
        }

        /// <summary>
        /// 通过Bulk更新文档  
        /// </summary>
        /// <returns></returns>
        public async Task<bool> UpdateDocumentByBulk()
        {
            var content = new
            {
                user = "嘻嘻2",
                post_date = "2021-10-11T15:00:12",
                message = "更新一下"
            };

            bool result = await _elasticSearchServer.UpdateDocumentByBulkAsync("users", "1", content);
            return result;
        }

        /// <summary>
        /// 更新文档  
        /// </summary>
        /// <returns></returns>
        public async Task<bool> UpdateDocument()
        {
            var content = new
            {
                user = "嘻嘻3",
                post_date = "2021-10-11T15:00:12",
                message = "更新一下333"
            };

            bool result = await _elasticSearchServer.UpdateDocumentAsync("users", "1", content);
            return result;
        }

        public async Task<bool> SourceExistsAsync(string index = "users", string id = "1")
        {
            bool result = await _elasticSearchServer.SourceExistsAsync(index, id);
            return result;
        }

        /// <summary>
        /// 检测索引是否已经存在
        /// </summary>
        public async Task<bool> IsIndexExsit(string index = "users")
        {
            bool result = await _elasticSearchServer.IsIndexExsit(index);
            return result;
        }
        
        /// <summary>
        /// 创建index
        /// </summary>
        /// <param name="index"></param>
        /// <returns></returns>
        public async Task<bool> CreateIndex(string index)
        {
            if (string.IsNullOrWhiteSpace(index))
                return false;

            bool result = await _elasticSearchServer.CreateIndexAsync(index);
            return result;
        }

        /// <summary>
        /// 删除index
        /// </summary>
        /// <param name="index"></param>
        /// <returns></returns>
        public async Task<bool> DeleteIndex(string index)
        {
            if (string.IsNullOrWhiteSpace(index))
                return false;

            bool result = await _elasticSearchServer.DeleteIndexAsync(index);
            return result;
        }

    }

部分效果如下:

其实调用方法的时候我们可以看到执行的原生语句是什么:


贴一下测试用的原生语句:

##添加内容
PUT users/_create/2
{
     "user" : "xxx",
    "post_date" : "2021-10-11T14:40:12",
    "message" : "trying out Elasticsearch"
}

##根据index和id查询文档
GET users/_doc/1

##批量查询
GET /_mget
{
    "docs" : [
        {
            "_index" : "users",
            "_id" : "1"
        },
        {
            "_index" : "users",
            "_id" : "2"
        },
        {
            "_index" : "users",
            "_id" : "3"
        }
    ]
}

##更新文档
POST users/_update/1
{
     "doc" : {
         "user" : "嘻嘻1"
    }
}

##检查index是否存在
HEAD users


在贴一个代码下载地址:https://download.tnblog.net/resource/index/4407008a3846478c8eb0e33e48eff418


欢迎加群讨论技术,群:677373950(满了,可以加,但通过不了),2群:656732739

评价