Spring Data MongoDB Lookup with Pipeline Aggregation

18,229

Solution 1

Building upon the info given by @dnickless, I was able to solve this. I'll post the complete solution in the hopes it helps someone else in the future.

I'm using mongodb-driver:3.6.4

First, I had to create a custom aggregation operation class so that I could pass in a custom JSON mongodb query to be used in the aggregation operation. This will allow me to use pipeline within a $lookup which is not supported with the driver version I am using.

public class CustomProjectAggregationOperation implements AggregationOperation {
    private String jsonOperation;

    public CustomProjectAggregationOperation(String jsonOperation) {
        this.jsonOperation = jsonOperation;
    }

    @Override
    public Document toDocument(AggregationOperationContext aggregationOperationContext) {
        return aggregationOperationContext.getMappedObject(Document.parse(jsonOperation));
    }
}

Now that we have the ability to pass a custom JSON query into our mongodb spring implementation, all that is left is to plug those values into a TypedAggregation query.

public List<FulfillmentChannel> getFulfillmentChannels(
    String SOME_VARIABLE_STRING_1, 
    String SOME_VARIABLE_STRING_2) {

    AggregationOperation match = Aggregation.match(
            Criteria.where("dayOfWeek").is(SOME_VARIABLE_STRING_1));
    AggregationOperation match2 = Aggregation.match(
            Criteria.where("deliveryZipCodeTimings").ne(Collections.EMPTY_LIST));
    String query =
            "{ $lookup: { " +
                    "from: 'deliveryZipCodeTiming'," +
                    "let: { location_id: '$fulfillmentLocationId' }," +
                    "pipeline: [{" +
                    "$match: {$expr: {$and: [" +
                    "{ $eq: ['$fulfillmentLocationId', '$$location_id']}," +
                    "{ $eq: ['$zipCode', '" + SOME_VARIABLE_STRING_2 + "']}]}}}," +
                    "{ $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }]," +
                    "as: 'deliveryZipCodeTimings'}}";

    TypedAggregation<FulfillmentChannel> aggregation = Aggregation.newAggregation(
            FulfillmentChannel.class,
            match,
            new CustomProjectAggregationOperation(query),
            match2
    );

    AggregationResults<FulfillmentChannel> results = 
        mongoTemplate.aggregate(aggregation, FulfillmentChannel.class);
    return results.getMappedResults();
}

Solution 2

The drivers are pretty much always a little bit behind the current language features that MongoDB provides - hence some of the latest and greatest features are simply not nicely accessible through the API yet. I am afraid this is one of those cases and you'll need to resort to using strings. Kind of like so (untested):

AggregationOperation match = Aggregation.match(Criteria.where("dayOfWeek").is("SOME_VARIABLE_STRING_1"));
AggregationOperation match2 = Aggregation.match(Criteria.where("deliveryZipCodeTimings").ne([]));
String query = "{ $lookup: { from: 'deliveryZipCodeTiming', let: { location_id: '$fulfillmentLocationId' }, pipeline: [{ $match: { $expr: { $and: [ { $eq: ['$fulfillmentLocationId', '$$location_id']}, { $eq: ['$zipCode', 'SOME_VARIABLE_STRING_2']} ]} } }, { $project: { _id: 0, zipCode: 1, cutoffTime: 1 } }], as: 'deliveryZipCodeTimings' } }";
Aggregation.newAggregation(match, (DBObject) JSON.parse(query), match2);

Solution 3

I would like to add this my solution which is repeating in some aspect the solutions posted before.

Mongo driver v3.x

For Mongo driver v3.x I came to the following solution:

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.util.JSON;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    public JsonOperation(String json) {
        Object root = JSON.parse(json);

        documents = root instanceof BasicDBObject
                    ? Collections.singletonList(new Document(((BasicDBObject) root).toMap()))
                    : ((BasicDBList) root).stream().map(item -> new Document((Map<String, Object>) ((BasicDBObject) item).toMap())).collect(Collectors.toList());
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }
}

and then provided that aggregation steps are given in some resource aggregations.json:

[
  {
    $match: {
      "userId": "..."
    }
  },
  {
    $lookup: {
      let: {
        ...
      },
      from: "another_collection",
      pipeline: [
        ...
      ],
      as: "things"
    }
  },
  {
    $sort: {
      "date": 1
    }
  }
]

one can use above class as follows:

import static org.springframework.data.mongodb.core.aggregation.Aggregation.newAggregation;

Collection<ResultDao> results = mongoTemplate.aggregate(newAggregation(new JsonOperation(resourceToString("aggregations.json", StandardCharsets.UTF_8))), "some_collection", ResultDao.class).getMappedResults();

Mongo driver v4.x

As JSON class was removed from Mongo v4, I have rewritten the class as follows:

import java.util.Collections;
import java.util.List;

import org.bson.Document;
import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
import org.springframework.data.mongodb.core.aggregation.AggregationOperationContext;

public class JsonOperation implements AggregationOperation {

    private List<Document> documents;

    private static final String DUMMY_KEY = "dummy";

    public JsonOperation(String json) {
        documents = parseJson(json);
    }

    static final List<Document> parseJson(String json) {
        return (json.startsWith("["))
                    ? Document.parse("{\"" + DUMMY_KEY + "\": " + json + "}").getList(DUMMY_KEY, Document.class)
                    : Collections.singletonList(Document.parse(json));
    }

    @Override
    public Document toDocument(AggregationOperationContext context) {
        // Not necessary to return anything as we override toPipelineStages():
        return null;
    }

    @Override
    public List<Document> toPipelineStages(AggregationOperationContext context) {
        return documents;
    }

    @Override
    public String getOperator() {
        return documents.iterator().next().keySet().iterator().next();
    }
}

but implementation is now a bit ugly because of string manipulations. If somebody has a better idea of how to parse array of objects in a more elegant way, please edit this post or drop a comment. Ideally there should be some method in Mongo core that allows to parse either JSON object or list (returns BasicDBObject/BasicDBList or Document/List<Document>).

Also note that I have skipped the step of transforming Document instances in toPipelineStages() method as it is not necessary in my case:

@Override
public List<Document> toPipelineStages(AggregationOperationContext context) {
    return documents.stream().map(document -> context.getMappedObject(document)).collect(Collectors.toList());
}

Solution 4

I faced some JSON parsing exceptions when I used the way explained in the accepted answer, so I dig deep the default MongoDB java driver(version 3) Document class to build up aggregation query and found out any aggregation query can be build u as follows,

Replace each of the element in the mongo console query as follows

  1. Curly braces({) -> new Document()
  2. parameter names are same
  3. Colon(:) -> Coma(,)
  4. Coma(,) -> .append()
  5. Square bracket([) -> Arrays.asList()
  AggregationOperation customLookupOperation = new AggregationOperation() {
                @Override
                public Document toDocument(AggregationOperationContext context) {
                    return new Document(
                            "$lookup",
                            new Document("from", "deliveryZipCodeTiming")
                                    .append("let",new Document("location_id", "$fulfillmentLocationId"))
                                    .append("pipeline", Arrays.<Object> asList(
                                            new Document("$match", new Document("$expr", new Document("$and",
                                                    Arrays.<Object>asList(
                                                            new Document("$eq", Arrays.<Object>asList("$fulfillmentLocationId", "$$location_id")),
                                                            new Document("$eq", Arrays.<Object>asList("$zipCode", "SOME_VARIABLE_STRING_2"))
                                                    )))),
                                            new Document("$project", new Document("_id",0).append("zipCode", 1)
                                                    .append("cutoffTime", 1)
)
                                    ))
                                    .append("as", "deliveryZipCodeTimings")
                    );
                }
            };

Finally you can use the aggregation operation in the aggrgation pipeline,

            Aggregation aggregation = Aggregation.newAggregation(matchOperation,customLookupOperation,matchOperation2);
Share:
18,229

Related videos on Youtube

Always Learning
Author by

Always Learning

Full stack software developer who enjoys everything from back-end microservice development to front-end MV* framework development. Also very interested in infrastructure, cloud and all the pieces that go into making software easily consumable and maintainable.

Updated on October 02, 2022

Comments

  • Always Learning
    Always Learning over 1 year

    How would I convert the following MongoDB query into a query to be used by my Java Spring application? I can't find a way to use pipeline with the provided lookup method.

    Here is the query I am attempting to convert. I also want to note that I didn't use $unwind as I wanted the deliveryZipCodeTimings to stay as a grouped collection in the return object.

    db.getCollection('fulfillmentChannel').aggregate([
        {
            $match: {
                "dayOfWeek": "SOME_VARIABLE_STRING_1"
            }
        },
        {
            $lookup: {
                from: "deliveryZipCodeTiming",
                let: { location_id: "$fulfillmentLocationId" },
                pipeline: [{
                    $match: {
                        $expr: {
                            $and: [
                                {$eq: ["$fulfillmentLocationId", "$$location_id"]},
                                {$eq: ["$zipCode", "SOME_VARIABLE_STRING_2"]}
                            ]
                        }
                    }
                },
                { 
                    $project: { _id: 0, zipCode: 1, cutoffTime: 1 } 
                }],
                as: "deliveryZipCodeTimings"
            }
        },
        {
            $match: {
                "deliveryZipCodeTimings": {$ne: []}
            }
        }
    ])
    
  • Always Learning
    Always Learning almost 6 years
    Thanks @dnickless however I don't see any Aggregation.newAggregation that takes a DBObject as a parameter. Do you know how I could go about passing DBObject into Aggregation.newAggregation?
  • dnickless
    dnickless almost 6 years
    @AlwaysLearning: You are right... Let me google a bit... Without Spring it would be possible like this: stackoverflow.com/questions/36352110/…
  • dnickless
    dnickless almost 6 years
    I think you can do it this way (by implementing your own JSON string based AggregationOperation class): stackoverflow.com/questions/39393672/…
  • dnickless
    dnickless almost 6 years
    The solution provided in the above link gives you the greatest level of flexibility also going forward. It would, however, be cleaner to grab the code from this class here: github.com/spring-projects/spring-data-mongodb/blob/master/…‌​, then extend it to make it support what you need and then create a pull request.
  • Always Learning
    Always Learning almost 6 years
    Thanks @dnickless, your input helped me towards the final solution. Upvoted.
  • Josh Balcitis
    Josh Balcitis over 4 years
    Great Answer! This helped a lot with a query I was working. I did find a slight issue when using dates as part of the match. When using dates in either the let or in the match directly the CustomProjectAggregationOperation's constructor should just accept a Document instead of a String it will need to parse later. The parsing of the string seems to mess up the dates resulting in either bad results or no results at all.
  • Farhan
    Farhan about 4 years
    @AlwaysLearning: I tried using TypedAggregation, however, my AggregationResult is a different POJO then my entity. I get a PropertyReferenceException saying the lookup id doesnt exist on the entity.
  • Mohit Sharma
    Mohit Sharma almost 3 years
    I am not sure why no one is pointing to "ExposedFields", When Spring data for mongo parses each step in the pipeline it looks for exposed fields from the previous step, in case of custom Aggregation it was failing for me with error java.lang.IllegalArgumentException: Invalid reference. I had to implement FieldsExposingAggregationOperation and FieldsExposingAggregationOperation.InheritsFieldsAggregation‌​Operation and provide implementation of getFields method. @Override public ExposedFields getFields() { return ExposedFields.synthetic(Fields.fields(this.exposedField)); }