MongoDB聚合操作符:$accumulator

$accumulator可以定义自定义累加器操作符。累加器是一种操作符,可在文档通过管道时保持其状态(如:总数、最大值、最小值和相关数据)。$accumulator操作符支持执行自定义的JavaScript函数,可以实现MongoDB查询语言不支持的行为。

$accumulator支持下列阶段:

  • $bucket
  • $bucketAuto
  • $group

**注意:**在聚合操作符内部执行JavaScript可能会降低性能,只有在MongoDB提供的管道操作符无法满足需求时才考虑使用$accumulator

语法

{
  $accumulator: {
    init: <code>,
    initArgs: <array expression>,        // 可选
    accumulate: <code>,
    accumulateArgs: <array expression>,
    merge: <code>,
    finalize: <code>,                    // 可选
    lang: <string>
  }
}

字段说明:

  • init
    字符串或代码,用于初始化状态的函数。init函数从initArgs数组接受参数,可以使用BSON类型代码或字符串来定义函数,init函数形式如下:

      function (<initArg1>, <initArg2>, ...) {
          ...
          return <initialState>
      }
    

    **注意:**溢出到磁盘或在分片集群上运行查询会导致累加器以多个子累加器合并的形式计算,每个函数都以调用init()开始。要确保init()accumulative()merge()函数与此执行模式兼容。

  • initArgs
    数组,可选,传递给init函数的参数。initArgs为下面的形式:

    [ <initArg1>, <initArg2>, ... ]
    

    **注意:**应用于$bucketAuto阶段时,initArgs不能应用分组键(也就是说不能使用$<fieldName>语法),换而言之,在$bucketAuto阶段,只能给initArgs指定常量。

  • accumulate
    字符串或代码,用于文档累加的函数,accumulate函数从accumulateArgs数组表达式接受当前状态和参数,accumulate函数的返回值变为新状态,函数定义可以为BSON类型的代码或字符串。accumulate函数的形式如下:

    function(state, <accumArg1>, <accumArg2>, ...) {
      ...
      return <newState>
    }
    
  • accumulateArgs
    传递给accumulate函数的参数,可以使用accumulateArgs来制定传递给accmulate函数的字段值。accumulateArgs的形式如下;

    [ <accumArg1>, <accumArg2>, ... ]
    
  • merge
    字符串或代码,用于合并两个内部状态的函数,merge必须是字符串或BSON代码类型,merge返回两个状态合并后的结果,mere函数的形式如下:

    function (<state1>, <state2>) {
      <logic to merge state1 and state2>
      return <newState>
    }
    
  • finalize
    字符串或代码,可选,用于更新累加结果的函数。finalize函数的形式如下:

    function (state) {
      ...
      return <finalState>
    }
    
    
  • lang
    字符串类型,accumulator代码使用的语言。目前仅支持js

使用

下面是$accumulator操作符处理文档的过程和步骤:

  1. 操作符从初始状态开始,由init函数定义。
  2. 操作符基于accumulate指定的函数更新每个文档的状态。
  3. 当操作符需要合并多个中间状态时,会执行merge函数。
  4. 如果定义了finalize函数,一旦所有文档被处理完并且状态因此更新,最终确定将状态到最终输出。

使用$merge合并两个状态

作为其内部运作的一部分,累加器操作符可能需要合并两个独立的中间状态。合并函数指定操作符应如何合并两个状态。

例如,下面的情况下,$accumulator可能需要合并两个状态:

  • $accumulator运行在分片集群上,操作符需要合并每个分片的结果,以得到的最终结果。
  • 单个$accumulator操作超出了它指定的内存限制,如果指定了alloDiskUse选项,操作符将正在进行的操作放在磁盘上并在内存中完成操作。一旦操作完成,磁盘上、内存中的的结果将由merge函数进行合并。

**注意:**合并函数总是一次合并两个状态。当有两个以上状态需要合并时,则会先合并两个状态为一个状态,然后重复这一过程,直到所有的状态都被合并。也就是说每次合并只能合并两个状态。

内嵌javascript

要使用$accumulator,必须启用服务端脚本选项。如果不使用$accumulator(或$function$where、或mapRecuce),可以禁用服务端脚本:

  • 对于mongod实例,可以参考security.javascriptEnabled选项配置或--noscripting命令行选项。
  • 对于mongos实例,可以参考security.javascriptEnabled选项配置或--noscripting命令行选项(从MongoDB4.4开始)。
    对于更早的版本,MongoDB不允许在mongos实例上执行JavaScript。

不支持的数组和字符串函数

MongoDB6.0升级了用于服务端JavaScript的内部JavaScript引擎,包括$accumulator$function$where表达式,从MozJS-60升级到了MozJS-91MozJS-60中存在的一些不推荐使用的非标准数组和字符串函数在MozJS-91中被删除。

举例

$accumulator实现$avg操作符

**注意:**本例使用$accumulator实现的$avg在MongoDB中已经支持,本例的目的不是去实现一个新功能,而是演示accumulator操作符常见逻辑的行为和语法。

mongosh,使用下面的文档创建一个books集合:

db.books.insertMany([
  { "_id" : 8751, "title" : "The Banquet", "author" : "Dante", "copies" : 2 },
  { "_id" : 8752, "title" : "Divine Comedy", "author" : "Dante", "copies" : 1 },
  { "_id" : 8645, "title" : "Eclogues", "author" : "Dante", "copies" : 2 },
  { "_id" : 7000, "title" : "The Odyssey", "author" : "Homer", "copies" : 10 },
  { "_id" : 7020, "title" : "Iliad", "author" : "Homer", "copies" : 10 }
])

下面的操作使用author对文档进行分组,并使用$accumulator计算每个作者书籍的平均拷贝数。

db.books.aggregate([
{
  $group :
  {
    _id : "$author",
    avgCopies:
    {
      $accumulator:
      {
        init: function() {                        // 设置初始状态
          return { count: 0, sum: 0 }
        },

        accumulate: function(state, numCopies) {  // 定义如何更新状态
          return {
            count: state.count + 1,
            sum: state.sum + numCopies
          }
        },

        accumulateArgs: ["$copies"],              // accumulate函数需要的参数

        merge: function(state1, state2) {         // 当操作符执行合并
          return {                                // 两个状态加到字段
            count: state1.count + state2.count,
            sum: state1.sum + state2.sum
          }
        },

        finalize: function(state) {               // 收集所有文档的结果后
          return (state.sum / state.count)        // 计算平均值
        },
        lang: "js"
      }
    }
  }
}
])
结果

操作返回下面的结果:

{ "_id" : "Dante", "avgCopies" : 1.6666666666666667 }
{ "_id" : "Homer", "avgCopies" : 10 }
过程分析

$accumulator定义了一个初始状态,其中countsum都设置为0$accumulator对每个文档使用以下方式更新状态:

  • count每次加1
  • 将文档copies字段的值加到sum,通过accumulateArgs指定的参数,累加器函数可以访问copies字段。

当所有的文档都被处理完后,accumulate函数返回更新后的状态。

一旦所有的文档被处理完,finilize函数使用拷贝的sum值除以文档数量count得到平均值。这样就不需要保持运行计算后的平均值,因为finilize函数会接收到sumcount的累计值。

对比$avg

下面的操作使用了$avg操作符,与上面的方法是等价的:

db.books.aggregate([
{
  $group : {
    _id : "$author",
    avgCopies: { $avg: "$copies" }
  }
}
])

使用initArgs按分组改变初始状态

可以使用initArgs选项去改变$accumulator的初始状态,在某些情况下是比较有用的,比如:

  • 使用状态中没有的字段值来影响状态
  • 根据正在处理的组,将初始状态设置为不同的值。

restaurants集合有下面的内容:

db.restaurants.insertMany([
  { "_id" : 1, "name" : "Food Fury", "city" : "Bettles", "cuisine" : "American" },
  { "_id" : 2, "name" : "Meal Macro", "city" : "Bettles", "cuisine" : "Chinese" },
  { "_id" : 3, "name" : "Big Crisp", "city" : "Bettles", "cuisine" : "Latin" },
  { "_id" : 4, "name" : "The Wrap", "city" : "Onida", "cuisine" : "American" },
  { "_id" : 5, "name" : "Spice Attack", "city" : "Onida", "cuisine" : "Latin" },
  { "_id" : 6, "name" : "Soup City", "city" : "Onida", "cuisine" : "Chinese" },
  { "_id" : 7, "name" : "Crave", "city" : "Pyote", "cuisine" : "American" },
  { "_id" : 8, "name" : "The Gala", "city" : "Pyote", "cuisine" : "Chinese" }
])

假设一个应用允许用户使用这些数据去查找饭店,或许显示更多与用户居住城市相关的饭店会更有用,在这个例子中,假定用户所在的城市是userProfileCity变量。

下面的聚合管道按照city进行分组,操作使用$accumulator来显示与用户资料匹配的城市餐厅的数量:

**注意:**如果在mongosh中执行,需要把initArgs<userProfileCity>替换为实际城市的字符串值,如:“Bettles”。

db.restaurants.aggregate([
{
  $group :
  {
    _id : { city: "$city" },
    restaurants:
    {
      $accumulator:
      {
        init: function(city, userProfileCity) {        // 设置初始状态
          return {
            max: city === userProfileCity ? 3 : 1,     // 如果分组匹配到用户的城市, 返回3个饭店
            restaurants: []                            // 否则, 返回1个饭店
          }
        },

        initArgs: ["$city", <userProfileCity>],        // 传递给init函数的参数

        accumulate: function(state, restaurantName) {  // 定义如何更新状态
          if (state.restaurants.length < state.max) {
            state.restaurants.push(restaurantName);
          }
          return state;
        },

        accumulateArgs: ["$name"],                     // accumulate函数需要的参数

        merge: function(state1, state2) {
          return {
            max: state1.max,
            restaurants: state1.restaurants.concat(state2.restaurants).slice(0, state1.max)
          }
        },

        finalize: function(state) {                   // 调整状态,只返回需要的字段
          return state.restaurants
        }

        lang: "js"
      }
    }
  }
}
])
返回结果

如果userProfileCityBettles,操作返回下面的结果:

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury", "Meal Macro", "Big Crisp" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

如果userProfileCityOnida,操作返回下面的结果:

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap", "Spice Attack", "Soup City" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }

如果userProfileCity为是别的值,操作返回下面的结果:

{ "_id" : { "city" : "Bettles" }, "restaurants" : { "restaurants" : [ "Food Fury" ] } }
{ "_id" : { "city" : "Onida" }, "restaurants" : { "restaurants" : [ "The Wrap" ] } }
{ "_id" : { "city" : "Pyote" }, "restaurants" : { "restaurants" : [ "Crave" ] } }
执行过程分析

初始化函数init定义了初始状态,包含了maxrestaurants字段,max字段为指定分组设置饭店的最大数量。如果文档的city字段与userProfileCity匹配,分组最多包含3个饭店,否则,如果文档_iduserProfileCity不匹配,分组最多包含一个饭店。init函数从initArgs数组接受cityuserProfileCity参数。

对于$accumulator处理的每个文档,会把饭店的name放到restaurants数组,前提是名称不会让restaurants的长度超过max的值。在所有的文档都被处理完成后,accumulate函数返回更新后的状态。

merge函数定义了如何合并两个状态,它将每个状态的restaurant连接在一起,并且结果数组的长度使用slice方法进行了限制,确保它不会超过max的值。

一旦所有的文档被处理完,finalize函数修改结果状态,值返回饭店的名称,如果没有这个函数,max字段也会包含在输出中,这将不能满足应用的需求。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
THE END
分享
二维码
< <上一篇
下一篇>>