乐闻世界logo
搜索文章和话题

如何使用 elasticsearch 搜索 mongodb 的数据?

6 个月前提问
2 个月前修改
浏览次数91

7个答案

1
2
3
4
5
6
7

使用 Elasticsearch 搜索 MongoDB 数据的步骤

1. 数据同步(同步 MongoDB 数据到 Elasticsearch)

首先,需要将 MongoDB 中的数据同步到 Elasticsearch。这可以通过多种方式实现,常见的方法包括使用 Logstash 或者自定义脚本来进行数据迁移。

示例使用 Logstash:

  1. 安装 Logstash。
  2. 创建一个配置文件 (mongo_to_es.conf),内容如下:
conf
input { mongodb { uri => 'mongodb://localhost:27017' placeholder_db_dir => '/opt/logstash-mongodb/' placeholder_db_name => 'logstash_sqlite.db' collection => 'your_collection' batch_size => 5000 } } filter { # 这里可以添加数据处理的filter } output { elasticsearch { hosts => ["localhost:9200"] index => "mongodb_index" document_type => "your_type" } }
  1. 运行 Logstash 配置:
bash
logstash -f mongo_to_es.conf

2. 查询设计

一旦数据同步到 Elasticsearch,就可以利用 Elasticsearch 的强大搜索功能来设计和优化查询。例如,可以利用 Elasticsearch 的全文搜索功能、聚合查询等。

示例查询:

假设我们需要在 MongoDB 的数据中搜索特定的用户信息,可以在 Elasticsearch 中这样查询:

bash
GET /mongodb_index/_search { "query": { "match": { "username": "john_doe" } } }

3. 结果处理

查询结果将以 JSON 格式返回,可以在应用程序中进一步处理这些数据以满足业务需求。

示例处理:

可以在后端服务中解析 Elasticsearch 返回的 JSON 数据,根据实际需要转换数据格式或执行其他业务逻辑。

4. 数据更新和维护

为了保持 Elasticsearch 和 MongoDB 的数据一致性,需要定期或实时同步 MongoDB 的数据更改到 Elasticsearch。这可以通过定时任务或监听 MongoDB 的变更流(Change Streams)实现。

示例使用 MongoDB Change Streams:

可以编写一个脚本或服务监听 MongoDB 的 Change Streams,一旦检测到数据变动(如增加、删除、修改),即时更新 Elasticsearch 数据。

python
import pymongo from elasticsearch import Elasticsearch client = pymongo.MongoClient('mongodb://localhost:27017') db = client.your_database collection = db.your_collection es = Elasticsearch(['http://localhost:9200']) change_stream = collection.watch() for change in change_stream: if change['operationType'] == 'insert': es.index(index='mongodb_index', doc_type='your_type', id=change['documentKey']['_id'], body=change['fullDocument']) elif change['operationType'] == 'update': es.update(index='mongodb_index', doc_type='your_type', id=change['documentKey']['_id'], body={'doc': change['updateDescription']['updatedFields']}) elif change['operationType'] == 'delete': es.delete(index='mongodb_index', doc_type='your_type', id=change['documentKey']['_id'])

总结

通过以上步骤,可以实现使用 Elasticsearch 来搜索和分析存储在 MongoDB 中的数据。这种方式利用了 Elasticsearch 的强大搜索和分析能力,同时保持了 MongoDB 的灵活性和强大的文档存储功能。

2024年6月29日 12:07 回复

这个答案应该足以让您准备好遵循有关使用 MongoDB、Elasticsearch 和 AngularJS 构建功能搜索组件的教程。

如果您希望对来自 API 的数据使用分面搜索,那么您可能需要看看Matthiasn 的BirdWatch Repo 。

下面介绍了如何设置单节点 Elasticsearch“集群”来索引 MongoDB,以便在新的 EC2 Ubuntu 14.04 实例上的 NodeJS、Express 应用程序中使用。

确保一切都是最新的。

shell
sudo apt-get update

安装 NodeJS。

shell
sudo apt-get install nodejs sudo apt-get install npm

安装 MongoDB - 这些步骤直接来自 MongoDB 文档。选择您喜欢的任何版本。我坚持使用 v2.4.9,因为它似乎是MongoDB-River支持的最新版本,没有问题。

导入 MongoDB 公共 GPG 密钥。

shell
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 7F0CEB10

更新您的来源列表。

shell
echo 'deb http://downloads-distro.mongodb.org/repo/ubuntu-upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list

获取 10gen 包。

shell
sudo apt-get install mongodb-10gen

如果您不需要最新版本,请选择您的版本。如果您在 Windows 7 或 8 计算机上设置环境,请远离 v2.6,直到它们通过将其作为服务运行来解决一些错误。

shell
apt-get install mongodb-10gen=2.4.9

防止更新时 MongoDB 安装版本被提升。

shell
echo "mongodb-10gen hold" | sudo dpkg --set-selections

启动 MongoDB 服务。

shell
sudo service mongodb start

您的数据库文件默认为 /var/lib/mongo,日志文件默认为 /var/log/mongo。

通过 mongo shell 创建一个数据库并向其中推送一些虚拟数据。

shell
mongo YOUR_DATABASE_NAME db.createCollection(YOUR_COLLECTION_NAME) for (var i = 1; i <= 25; i++) db.YOUR_COLLECTION_NAME.insert( { x : i } )

现在将独立的 MongoDB 转换为副本集

首先关闭进程。

shell
mongo YOUR_DATABASE_NAME use admin db.shutdownServer()

现在我们将 MongoDB 作为服务运行,因此当我们重新启动 mongod 进程时,我们不会在命令行参数中传递“--replSet rs0”选项。相反,我们将其放入 mongod.conf 文件中。

shell
vi /etc/mongod.conf

添加这些行,替换您的数据库和日志路径。

shell
replSet=rs0 dbpath=YOUR_PATH_TO_DATA/DB logpath=YOUR_PATH_TO_LOG/MONGO.LOG

现在再次打开 mongo shell 来初始化副本集。

shell
mongo DATABASE_NAME config = { "_id" : "rs0", "members" : [ { "_id" : 0, "host" : "127.0.0.1:27017" } ] } rs.initiate(config) rs.slaveOk() // allows read operations to run on secondary members.

现在安装 Elasticsearch。我只是遵循这个有用的Gist

确保已安装 Java。

shell
sudo apt-get install openjdk-7-jre-headless -y

暂时使用 v1.1.x,直到 Mongo-River 插件错误在 v1.2.1 中得到修复。

shell
wget https://download.elasticsearch.org/elasticsearch/elasticsearch/elasticsearch-1.1.1.deb sudo dpkg -i elasticsearch-1.1.1.deb curl -L http://github.com/elasticsearch/elasticsearch-servicewrapper/tarball/master | tar -xz sudo mv *servicewrapper*/service /usr/local/share/elasticsearch/bin/ sudo rm -Rf *servicewrapper* sudo /usr/local/share/elasticsearch/bin/service/elasticsearch install sudo ln -s `readlink -f /usr/local/share/elasticsearch/bin/service/elasticsearch` /usr/local/bin/rcelasticsearch

如果您目前仅在单个节点上进行开发,请确保 /etc/elasticsearch/elasticsearch.yml 启用了以下配置选项:

shell
cluster.name: "MY_CLUSTER_NAME" node.local: true

启动 Elasticsearch 服务。

shell
sudo service elasticsearch start

验证它是否正常工作。

shell
curl http://localhost:9200

如果你看到这样的东西那么你就很好了。

shell
{ "status" : 200, "name" : "Chi Demon", "version" : { "number" : "1.1.2", "build_hash" : "e511f7b28b77c4d99175905fac65bffbf4c80cf7", "build_timestamp" : "2014-05-22T12:27:39Z", "build_snapshot" : false, "lucene_version" : "4.7" }, "tagline" : "You Know, for Search" }

现在安装 Elasticsearch 插件,以便它可以与 MongoDB 一起使用。

shell
bin/plugin --install com.github.richardwilly98.elasticsearch/elasticsearch-river-mongodb/1.6.0 bin/plugin --install elasticsearch/elasticsearch-mapper-attachments/1.6.0

这两个插件不是必需的,但它们非常适合测试查询和可视化索引的更改。

shell
bin/plugin --install mobz/elasticsearch-head bin/plugin --install lukas-vlcek/bigdesk

重新启动 Elasticsearch。

shell
sudo service elasticsearch restart

最后索引 MongoDB 中的集合。

shell
curl -XPUT localhost:9200/_river/DATABASE_NAME/_meta -d '{ "type": "mongodb", "mongodb": { "servers": [ { "host": "127.0.0.1", "port": 27017 } ], "db": "DATABASE_NAME", "collection": "ACTUAL_COLLECTION_NAME", "options": { "secondary_read_preference": true }, "gridfs": false }, "index": { "name": "ARBITRARY INDEX NAME", "type": "ARBITRARY TYPE NAME" } }'

检查您的索引是否在 Elasticsearch 中

shell
curl -XGET http://localhost:9200/_aliases

检查您的集群运行状况。

shell
curl -XGET 'http://localhost:9200/_cluster/health?pretty=true'

它可能是黄色的,有一些未分配的碎片。我们必须告诉 Elasticsearch 我们想要使用什么。

shell
curl -XPUT 'localhost:9200/_settings' -d '{ "index" : { "number_of_replicas" : 0 } }'

再次检查集群健康状况。现在应该是绿色的。

shell
curl -XGET 'http://localhost:9200/_cluster/health?pretty=true'

去玩。

2024年6月29日 12:07 回复

当您的运营规模扩大时,使用河流可能会出现问题。River 在繁重的操作下会使用大量内存。我建议实现您自己的 Elasticsearch 模型,或者如果您使用 mongoose,您可以直接在其中构建您的 Elasticsearch 模型,或者使用mongoosastic,它基本上可以为您完成此操作。

Mongodb River 的另一个缺点是您将陷入使用 mongodb 2.4.x 分支和 ElasticSearch 0.90.x 的困境。您会开始发现您错过了很多非常好的功能,并且 mongodb River 项目只是没有足够快地生产可用的产品来保持稳定。也就是说,Mongodb River 绝对不是我会投入生产的东西。它带来的问题多于其价值。它会在重负载下随机丢弃写入,会消耗大量内存,并且没有设置可以限制这一点。此外,River 不会实时更新,它会从 mongodb 读取 oplog,根据我的经验,这可能会延迟更新长达 5 分钟。

我们最近不得不重写项目的很大一部分,因为 ElasticSearch 每周都会出现问题。我们甚至聘请了一位开发运营顾问,他也同意最好远离 River。

更新: Elasticsearch-mongodb-river 现在支持 ES v1.4.0 和 mongodb v2.6.x。但是,您仍然可能会在繁重的插入/更新操作中遇到性能问题,因为此插件将尝试读取 mongodb 的 oplog 进行同步。如果自锁(或闩锁)解锁以来有很多操作,您会注意到您的elasticsearch服务器上的内存使用率极高。如果您打算开展大型业务,河流并不是一个好的选择。ElasticSearch 的开发人员仍然建议您使用适合您语言的客户端库直接与他们的 API 进行通信,而不是使用 River 来管理自己的索引。这并不是河流的真正目的。Twitter-river 是如何使用河流的一个很好的例子。它本质上是一种从外部来源获取数据的好方法,但对于高流量或内部使用来说不太可靠。

还要考虑到 mongodb-river 版本落后,因为它不是由 ElasticSearch 组织维护,而是由第三方维护。v1.0发布后很长一段时间开发都停留在v0.90分支上,当v1.0的版本发布时并不稳定,直到elasticsearch发布了v1.3.0。Mongodb 版本也落后了。当您希望迁移到每个版本的更高版本时,您可能会发现自己陷入困境,尤其是 ElasticSearch 正在大力开发,并且即将推出许多非常令人期待的功能。掌握最新的 ElasticSearch 非常重要,因为我们严重依赖不断改进的搜索功能作为我们产品的核心部分。

总而言之,如果您自己动手,您可能会得到更好的产品。没那么难。它只是在代码中管理的另一个数据库,并且可以轻松地将其放入现有模型中,而无需进行重大重构。

2024年6月29日 12:07 回复

如果您想要一个几乎实时同步和通用的解决方案,River 是一个很好的解决方案。

如果您已经在 MongoDB 中拥有数据,并且希望像“一次性”一样轻松地将其发送到 Elasticsearch,您可以在 Node.js https://github.com/itemsapi/elasticbulk中尝试我的包。

它使用 Node.js 流,因此您可以从支持流的所有内容(即 MongoDB、PostgreSQL、MySQL、JSON 文件等)导入数据

MongoDB 到 Elasticsearch 的示例:

安装包:

shell
npm install elasticbulk npm install mongoose npm install bluebird

创建脚本即script.js:

shell
const elasticbulk = require('elasticbulk'); const mongoose = require('mongoose'); const Promise = require('bluebird'); mongoose.connect('mongodb://localhost/your_database_name', { useMongoClient: true }); mongoose.Promise = Promise; var Page = mongoose.model('Page', new mongoose.Schema({ title: String, categories: Array }), 'your_collection_name'); // stream query var stream = Page.find({ }, {title: 1, _id: 0, categories: 1}).limit(1500000).skip(0).batchSize(500).stream(); elasticbulk.import(stream, { index: 'my_index_name', type: 'my_type_name', host: 'localhost:9200', }) .then(function(res) { console.log('Importing finished'); })

发送您的数据:

shell
node script.js

它不是非常快,但它可以处理数百万条记录(感谢流)。

2024年6月29日 12:07 回复

在这里,我发现了另一个将 MongoDB 数据迁移到 Elasticsearch 的好选择。一个 go 守护进程,将 mongodb 实时同步到 elasticsearch。它是蒙斯塔什。出售地点: Monstache

下面的初始设置来配置和使用它。

步骤1:

shell
C:\Program Files\MongoDB\Server\4.0\bin>mongod --smallfiles --oplogSize 50 --replSet test

第2步 :

shell
C:\Program Files\MongoDB\Server\4.0\bin>mongo C:\Program Files\MongoDB\Server\4.0\bin>mongo MongoDB shell version v4.0.2 connecting to: mongodb://127.0.0.1:27017 MongoDB server version: 4.0.2 Server has startup warnings: 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** WARNING: Access control is not enabled for the database. 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** Read and write access to data and configuration is unrestricted. 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** WARNING: This server is bound to localhost. 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** Remote systems will be unable to connect to this server. 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** Start the server with --bind_ip <address> to specify which IP 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** addresses it should serve responses from, or with --bind_ip_all to 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** bind to all interfaces. If this behavior is desired, start the 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] ** server with --bind_ip 127.0.0.1 to disable this warning. 2019-01-18T16:56:44.931+0530 I CONTROL [initandlisten] MongoDB Enterprise test:PRIMARY>

步骤 3:验证复制。

shell
MongoDB Enterprise test:PRIMARY> rs.status(); { "set" : "test", "date" : ISODate("2019-01-18T11:39:00.380Z"), "myState" : 1, "term" : NumberLong(2), "syncingTo" : "", "syncSourceHost" : "", "syncSourceId" : -1, "heartbeatIntervalMillis" : NumberLong(2000), "optimes" : { "lastCommittedOpTime" : { "ts" : Timestamp(1547811537, 1), "t" : NumberLong(2) }, "readConcernMajorityOpTime" : { "ts" : Timestamp(1547811537, 1), "t" : NumberLong(2) }, "appliedOpTime" : { "ts" : Timestamp(1547811537, 1), "t" : NumberLong(2) }, "durableOpTime" : { "ts" : Timestamp(1547811537, 1), "t" : NumberLong(2) } }, "lastStableCheckpointTimestamp" : Timestamp(1547811517, 1), "members" : [ { "_id" : 0, "name" : "localhost:27017", "health" : 1, "state" : 1, "stateStr" : "PRIMARY", "uptime" : 736, "optime" : { "ts" : Timestamp(1547811537, 1), "t" : NumberLong(2) }, "optimeDate" : ISODate("2019-01-18T11:38:57Z"), "syncingTo" : "", "syncSourceHost" : "", "syncSourceId" : -1, "infoMessage" : "", "electionTime" : Timestamp(1547810805, 1), "electionDate" : ISODate("2019-01-18T11:26:45Z"), "configVersion" : 1, "self" : true, "lastHeartbeatMessage" : "" } ], "ok" : 1, "operationTime" : Timestamp(1547811537, 1), "$clusterTime" : { "clusterTime" : Timestamp(1547811537, 1), "signature" : { "hash" : BinData(0,"AAAAAAAAAAAAAAAAAAAAAAAAAAA="), "keyId" : NumberLong(0) } } } MongoDB Enterprise test:PRIMARY>

步骤 4. 下载“ https://github.com/rwynn/monstache/releases ”。解压缩下载并调整 PATH 变量以包含您平台的文件夹路径。转到 cmd 并输入"monstache -v" # 4.13.1 Monstache 使用 TOML 格式进行配置。配置名为 config.toml 的迁移文件

步骤 5。

我的 config.toml -->

shell
mongo-url = "mongodb://127.0.0.1:27017/?replicaSet=test" elasticsearch-urls = ["http://localhost:9200"] direct-read-namespaces = [ "admin.users" ] gzip = true stats = true index-stats = true elasticsearch-max-conns = 4 elasticsearch-max-seconds = 5 elasticsearch-max-bytes = 8000000 dropped-collections = false dropped-databases = false resume = true resume-write-unsafe = true resume-name = "default" index-files = false file-highlighting = false verbose = true exit-after-direct-reads = false index-as-update=true index-oplog-time=true

步骤 6。

shell
D:\15-1-19>monstache -f config.toml

蒙斯塔奇奔跑...

在 Elasticsearch 确认迁移的数据

在 Mongo 添加记录

Monstache 捕获事件并将数据迁移到elasticsearch

2024年6月29日 12:07 回复

我发现 mongo-connector 很有用。它来自 Mongo Labs (MongoDB Inc.),现在可以与 Elasticsearch 2.x 一起使用

Elastic 2.x 文档管理器:https://github.com/mongodb-labs/elastic2-doc-manager

mongo-connector 创建从 MongoDB 集群到一个或多个目标系统的管道,例如 Solr、Elasticsearch 或另一个 MongoDB 集群。它将 MongoDB 中的数据同步到目标,然后跟踪 MongoDB oplog,实时跟上 MongoDB 中的操作。它已使用 Python 2.6、2.7 和 3.3+ 进行了测试。详细的文档可以在 wiki 上找到。

https://github.com/mongodb-labs/mongo-connector https://github.com/mongodb-labs/mongo-connector/wiki/Usage%20with%20ElasticSearch

2024年6月29日 12:07 回复

Elasticsearch 与 MongoDB 结合使用的一般步骤

要使用 Elasticsearch 搜索 MongoDB 中的数据,您需要执行以下步骤:

  1. 数据同步 使用 MongoDB 的数据同步工具(例如MongoDB Connector for Elasticsearch或者Logstash的MongoDB插件)将数据从 MongoDB 同步到 Elasticsearch。这些工具会监听 MongoDB 的 oplog 功能来捕捉数据更改,并将这些更改实时同步到 Elasticsearch。
  2. 配置同步工具 配置同步工具以确定哪些集合(collections)或文档(documents)需要被同步到 Elasticsearch。通常,这涉及到设置数据流水线(pipeline),定义字段映射(mapping),以及可能的转换和过滤。
  3. 索引数据 将 MongoDB 数据索引到 Elasticsearch 中。索引是数据组织的方式,以便能够快速进行全文搜索。每个 MongoDB 文档都会变成 Elasticsearch 索引中的一个文档。
  4. 查询数据 使用 Elasticsearch 的查询语言(如 Query DSL)来对索引的数据进行搜索。Elasticsearch 提供了广泛的搜索功能,包括全文搜索、复合查询、过滤器和聚合等。
  5. 结果展示 将搜索结果展示给用户。这可能涉及到从 Elasticsearch 获取结果后再进行必要的后处理,以适配应用程序的展示需求。

示例案例

假设我们有一个 MongoDB 集合,存储了电子商务网站的产品信息。我们希望在 Elasticsearch 中建立一个可以进行全文搜索的商品索引。

以下是具体步骤的示例:

  1. 安装 MongoDB Connector for Elasticsearch 首先,我们需要安装并配置 MongoDB Connector for Elasticsearch,这是一个官方插件,可以将 MongoDB 集合的数据实时同步到 Elasticsearch。

  2. 配置同步 在 MongoDB Connector 中,我们定义了需要同步的数据库和集合,以及如何将 MongoDB 的文档结构映射到 Elasticsearch 的索引结构。例如,我们需要同步 products 集合,将产品名称、描述和价格等字段同步到 ES。

  3. 监控和维护 在数据同步过程中,我们需要监控同步任务以确保数据的一致性,并对错误或中断进行相应的处理。

  4. 编写搜索查询 当数据同步完毕后,我们可以使用 Elasticsearch 的查询 DSL 来编写搜索查询。例如,如果我们想搜索描述中包含“智能手机”的所有商品,可以编写如下查询:

    json
    { "query": { "match": { "description": "智能手机" } } }
  5. 整合到应用程序 最后,将 Elasticsearch 的搜索功能集成到应用程序中,确保用户能够发出搜索请求,并展示搜索结果。

使用 Elasticsearch 搜索 MongoDB 的数据可以有效地提高搜索性能和用户体验,同时保持 MongoDB 的高效数据存储和管理能力。在实际操作过程中,还需要考虑数据一致性、容错和性能优化等因素。

2024年6月29日 12:07 回复

你的答案