Solution 1

Your problem is that filter_by takes keyword arguments, but filter takes expressions. So expanding a dict for filter_by **mydict will work. With filter, you normally pass it one argument, which happens to be an expression. So when you expand your **filters dict to filter, you pass filter a bunch of keyword arguments that it doesn't understand.

If you want to build up a set of filters from a dict of stored filter args, you can use the generative nature of the query to keep applying filters. For example:

# assuming a model class, User, with attributes, name_last, name_first
my_filters = {'name_last':'Duncan', 'name_first':'Iain'}
query = session.query(User)
for attr,value in my_filters.iteritems():
    query = query.filter( getattr(User,attr)==value )
# now we can run the query
results = query.all()

The great thing about the above pattern is you can use it across multiple joined columns, you can construct 'ands' and 'ors' with and_ and or_, you can do <= or date comparisons, whatever. It's much more flexible than using filter_by with keywords. The only caveat is that for joins you have to be a bit careful you don't accidentally try to join a table twice, and you might have to specify the join condition for complex filtering. I use this in some very complex filtering over a pretty involved domain model and it works like a charm, I just keep a dict going of entities_joined to keep track of the joins.

Solution 3

FWIW, There's a Python library designed to solve this exact problem: sqlalchemy-filters

It allows to dynamically filter using all operators, not only ==.

from sqlalchemy_filters import apply_filters

# `query` should be a SQLAlchemy query object

filter_spec = [{'field': 'name', 'op': '==', 'value': 'name_1'}]
filtered_query = apply_filters(query, filter_spec)

more_filters = [{'field': 'foo_id', 'op': 'is_not_null'}]
filtered_query = apply_filters(filtered_query, more_filters)

result = filtered_query.all()

Solution 4

class Place(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    search_id = db.Column(db.Integer, db.ForeignKey('search.id'), nullable=False)

    def dynamic_filter(model_class, filter_condition):
        Return filtered queryset based on condition.
        :param query: takes query
        :param filter_condition: Its a list, ie: [(key,operator,value)]
        operator list:
            eq for ==
            lt for <
            ge for >=
            in for in_
            like for like
            value could be list or a string
        :return: queryset
        __query = db.session.query(model_class)
        for raw in filter_condition:
                key, op, value = raw
            except ValueError:
                raise Exception('Invalid filter: %s' % raw)
            column = getattr(model_class, key, None)
            if not column:
                raise Exception('Invalid filter column: %s' % key)
            if op == 'in':
                if isinstance(value, list):
                    filt = column.in_(value)
                    filt = column.in_(value.split(','))
                    attr = list(filter(lambda e: hasattr(column, e % op), ['%s', '%s_', '__%s__']))[0] % op
                except IndexError:
                    raise Exception('Invalid filter operator: %s' % op)
                if value == 'null':
                    value = None
                filt = getattr(column, attr)(value)
            __query = __query.filter(filt)
        return __query

Execute like:

places = Place.dynamic_filter([('search_id', 'eq', 1)]).all()
    I'm trying to implement dynamic filtering using SQLAlchemy ORM.

    I was looking through StackOverflow and found very similar question:SQLALchemy dynamic filter_by

    It's useful for me, but not enough.

    So, here is some example of code, I'm trying to write:

    # engine - MySQL engine
    session_maker = sessionmaker(bind=engine)
    session = session_maker()
    # my custom model
    model = User
    def get_query(session, filters):
        if type(filters) == tuple:
            query = session.query(model).filter(*filters)
        elif type(filters) == dict:
            query = session.query(model).filter(**filters)
        return query

    then I'm trying to reuse it with something very similar:

    filters = (User.name == 'Johny')
    get_query(s, filters) # it works just fine
    filters = {'name': 'Johny'}
    get_query(s, filters)

    After the second run, there are some issues:

    TypeError: filter() got an unexpected keyword argument 'name'

    When I'm trying to change my filters to:

    filters = {User.name: 'Johny'}

    it returns:

    TypeError: filter() keywords must be strings

    But it works fine for manual querying:

    s.query(User).filter(User.name == 'Johny')

    What is wrong with my filters?

    BTW, it looks like it works fine for case:

    filters = {'name':'Johny'}

    But following the recommendations from mentioned post I'm trying to use just filter.

    If it's just one possible to use filter_by instead of filter, is there any differences between these two methods?

