mas_storage_pg/
app_session.rs

1// Copyright 2024 New Vector Ltd.
2// Copyright 2023, 2024 The Matrix.org Foundation C.I.C.
3//
4// SPDX-License-Identifier: AGPL-3.0-only
5// Please see LICENSE in the repository root for full details.
6
7//! A module containing PostgreSQL implementation of repositories for sessions
8
9use async_trait::async_trait;
10use mas_data_model::{CompatSession, CompatSessionState, Device, Session, SessionState, User};
11use mas_storage::{
12    Clock, Page, Pagination,
13    app_session::{AppSession, AppSessionFilter, AppSessionRepository, AppSessionState},
14    compat::CompatSessionFilter,
15    oauth2::OAuth2SessionFilter,
16};
17use oauth2_types::scope::{Scope, ScopeToken};
18use opentelemetry_semantic_conventions::trace::DB_QUERY_TEXT;
19use sea_query::{
20    Alias, ColumnRef, CommonTableExpression, Expr, PostgresQueryBuilder, Query, UnionType,
21};
22use sea_query_binder::SqlxBinder;
23use sqlx::PgConnection;
24use tracing::Instrument;
25use ulid::Ulid;
26use uuid::Uuid;
27
28use crate::{
29    DatabaseError, ExecuteExt,
30    errors::DatabaseInconsistencyError,
31    filter::StatementExt,
32    iden::{CompatSessions, OAuth2Sessions},
33    pagination::QueryBuilderExt,
34};
35
36/// An implementation of [`AppSessionRepository`] for a PostgreSQL connection
37pub struct PgAppSessionRepository<'c> {
38    conn: &'c mut PgConnection,
39}
40
41impl<'c> PgAppSessionRepository<'c> {
42    /// Create a new [`PgAppSessionRepository`] from an active PostgreSQL
43    /// connection
44    pub fn new(conn: &'c mut PgConnection) -> Self {
45        Self { conn }
46    }
47}
48
49mod priv_ {
50    // The enum_def macro generates a public enum, which we don't want, because it
51    // triggers the missing docs warning
52
53    use std::net::IpAddr;
54
55    use chrono::{DateTime, Utc};
56    use sea_query::enum_def;
57    use uuid::Uuid;
58
59    #[derive(sqlx::FromRow)]
60    #[enum_def]
61    pub(super) struct AppSessionLookup {
62        pub(super) cursor: Uuid,
63        pub(super) compat_session_id: Option<Uuid>,
64        pub(super) oauth2_session_id: Option<Uuid>,
65        pub(super) oauth2_client_id: Option<Uuid>,
66        pub(super) user_session_id: Option<Uuid>,
67        pub(super) user_id: Option<Uuid>,
68        pub(super) scope_list: Option<Vec<String>>,
69        pub(super) device_id: Option<String>,
70        pub(super) human_name: Option<String>,
71        pub(super) created_at: DateTime<Utc>,
72        pub(super) finished_at: Option<DateTime<Utc>>,
73        pub(super) is_synapse_admin: Option<bool>,
74        pub(super) user_agent: Option<String>,
75        pub(super) last_active_at: Option<DateTime<Utc>>,
76        pub(super) last_active_ip: Option<IpAddr>,
77    }
78}
79
80use priv_::{AppSessionLookup, AppSessionLookupIden};
81
82impl TryFrom<AppSessionLookup> for AppSession {
83    type Error = DatabaseError;
84
85    #[allow(clippy::too_many_lines)]
86    fn try_from(value: AppSessionLookup) -> Result<Self, Self::Error> {
87        // This is annoying to do, but we have to match on all the fields to determine
88        // whether it's a compat session or an oauth2 session
89        let AppSessionLookup {
90            cursor,
91            compat_session_id,
92            oauth2_session_id,
93            oauth2_client_id,
94            user_session_id,
95            user_id,
96            scope_list,
97            device_id,
98            human_name,
99            created_at,
100            finished_at,
101            is_synapse_admin,
102            user_agent,
103            last_active_at,
104            last_active_ip,
105        } = value;
106
107        let user_session_id = user_session_id.map(Ulid::from);
108
109        match (
110            compat_session_id,
111            oauth2_session_id,
112            oauth2_client_id,
113            user_id,
114            scope_list,
115            device_id,
116            is_synapse_admin,
117        ) {
118            (
119                Some(compat_session_id),
120                None,
121                None,
122                Some(user_id),
123                None,
124                device_id_opt,
125                Some(is_synapse_admin),
126            ) => {
127                let id = compat_session_id.into();
128                let device = device_id_opt
129                    .map(Device::try_from)
130                    .transpose()
131                    .map_err(|e| {
132                        DatabaseInconsistencyError::on("compat_sessions")
133                            .column("device_id")
134                            .row(id)
135                            .source(e)
136                    })?;
137
138                let state = match finished_at {
139                    None => CompatSessionState::Valid,
140                    Some(finished_at) => CompatSessionState::Finished { finished_at },
141                };
142
143                let session = CompatSession {
144                    id,
145                    state,
146                    user_id: user_id.into(),
147                    device,
148                    human_name,
149                    user_session_id,
150                    created_at,
151                    is_synapse_admin,
152                    user_agent,
153                    last_active_at,
154                    last_active_ip,
155                };
156
157                Ok(AppSession::Compat(Box::new(session)))
158            }
159
160            (
161                None,
162                Some(oauth2_session_id),
163                Some(oauth2_client_id),
164                user_id,
165                Some(scope_list),
166                None,
167                None,
168            ) => {
169                let id = oauth2_session_id.into();
170                let scope: Result<Scope, _> =
171                    scope_list.iter().map(|s| s.parse::<ScopeToken>()).collect();
172                let scope = scope.map_err(|e| {
173                    DatabaseInconsistencyError::on("oauth2_sessions")
174                        .column("scope")
175                        .row(id)
176                        .source(e)
177                })?;
178
179                let state = match value.finished_at {
180                    None => SessionState::Valid,
181                    Some(finished_at) => SessionState::Finished { finished_at },
182                };
183
184                let session = Session {
185                    id,
186                    state,
187                    created_at,
188                    client_id: oauth2_client_id.into(),
189                    user_id: user_id.map(Ulid::from),
190                    user_session_id,
191                    scope,
192                    user_agent,
193                    last_active_at,
194                    last_active_ip,
195                    human_name,
196                };
197
198                Ok(AppSession::OAuth2(Box::new(session)))
199            }
200
201            _ => Err(DatabaseInconsistencyError::on("sessions")
202                .row(cursor.into())
203                .into()),
204        }
205    }
206}
207
208/// Split a [`AppSessionFilter`] into two separate filters: a
209/// [`CompatSessionFilter`] and an [`OAuth2SessionFilter`].
210fn split_filter(
211    filter: AppSessionFilter<'_>,
212) -> (CompatSessionFilter<'_>, OAuth2SessionFilter<'_>) {
213    let mut compat_filter = CompatSessionFilter::new();
214    let mut oauth2_filter = OAuth2SessionFilter::new();
215
216    if let Some(user) = filter.user() {
217        compat_filter = compat_filter.for_user(user);
218        oauth2_filter = oauth2_filter.for_user(user);
219    }
220
221    match filter.state() {
222        Some(AppSessionState::Active) => {
223            compat_filter = compat_filter.active_only();
224            oauth2_filter = oauth2_filter.active_only();
225        }
226        Some(AppSessionState::Finished) => {
227            compat_filter = compat_filter.finished_only();
228            oauth2_filter = oauth2_filter.finished_only();
229        }
230        None => {}
231    }
232
233    if let Some(device) = filter.device() {
234        compat_filter = compat_filter.for_device(device);
235        oauth2_filter = oauth2_filter.for_device(device);
236    }
237
238    if let Some(browser_session) = filter.browser_session() {
239        compat_filter = compat_filter.for_browser_session(browser_session);
240        oauth2_filter = oauth2_filter.for_browser_session(browser_session);
241    }
242
243    if let Some(last_active_before) = filter.last_active_before() {
244        compat_filter = compat_filter.with_last_active_before(last_active_before);
245        oauth2_filter = oauth2_filter.with_last_active_before(last_active_before);
246    }
247
248    if let Some(last_active_after) = filter.last_active_after() {
249        compat_filter = compat_filter.with_last_active_after(last_active_after);
250        oauth2_filter = oauth2_filter.with_last_active_after(last_active_after);
251    }
252
253    (compat_filter, oauth2_filter)
254}
255
256#[async_trait]
257impl AppSessionRepository for PgAppSessionRepository<'_> {
258    type Error = DatabaseError;
259
260    #[allow(clippy::too_many_lines)]
261    #[tracing::instrument(
262        name = "db.app_session.list",
263        fields(
264            db.query.text,
265        ),
266        skip_all,
267        err,
268    )]
269    async fn list(
270        &mut self,
271        filter: AppSessionFilter<'_>,
272        pagination: Pagination,
273    ) -> Result<Page<AppSession>, Self::Error> {
274        let (compat_filter, oauth2_filter) = split_filter(filter);
275
276        let mut oauth2_session_select = Query::select()
277            .expr_as(
278                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
279                AppSessionLookupIden::Cursor,
280            )
281            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::CompatSessionId)
282            .expr_as(
283                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2SessionId)),
284                AppSessionLookupIden::Oauth2SessionId,
285            )
286            .expr_as(
287                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::OAuth2ClientId)),
288                AppSessionLookupIden::Oauth2ClientId,
289            )
290            .expr_as(
291                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserSessionId)),
292                AppSessionLookupIden::UserSessionId,
293            )
294            .expr_as(
295                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserId)),
296                AppSessionLookupIden::UserId,
297            )
298            .expr_as(
299                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::ScopeList)),
300                AppSessionLookupIden::ScopeList,
301            )
302            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::DeviceId)
303            .expr_as(
304                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::HumanName)),
305                AppSessionLookupIden::HumanName,
306            )
307            .expr_as(
308                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::CreatedAt)),
309                AppSessionLookupIden::CreatedAt,
310            )
311            .expr_as(
312                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::FinishedAt)),
313                AppSessionLookupIden::FinishedAt,
314            )
315            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::IsSynapseAdmin)
316            .expr_as(
317                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::UserAgent)),
318                AppSessionLookupIden::UserAgent,
319            )
320            .expr_as(
321                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveAt)),
322                AppSessionLookupIden::LastActiveAt,
323            )
324            .expr_as(
325                Expr::col((OAuth2Sessions::Table, OAuth2Sessions::LastActiveIp)),
326                AppSessionLookupIden::LastActiveIp,
327            )
328            .from(OAuth2Sessions::Table)
329            .apply_filter(oauth2_filter)
330            .clone();
331
332        let compat_session_select = Query::select()
333            .expr_as(
334                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
335                AppSessionLookupIden::Cursor,
336            )
337            .expr_as(
338                Expr::col((CompatSessions::Table, CompatSessions::CompatSessionId)),
339                AppSessionLookupIden::CompatSessionId,
340            )
341            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2SessionId)
342            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::Oauth2ClientId)
343            .expr_as(
344                Expr::col((CompatSessions::Table, CompatSessions::UserSessionId)),
345                AppSessionLookupIden::UserSessionId,
346            )
347            .expr_as(
348                Expr::col((CompatSessions::Table, CompatSessions::UserId)),
349                AppSessionLookupIden::UserId,
350            )
351            .expr_as(Expr::cust("NULL"), AppSessionLookupIden::ScopeList)
352            .expr_as(
353                Expr::col((CompatSessions::Table, CompatSessions::DeviceId)),
354                AppSessionLookupIden::DeviceId,
355            )
356            .expr_as(
357                Expr::col((CompatSessions::Table, CompatSessions::HumanName)),
358                AppSessionLookupIden::HumanName,
359            )
360            .expr_as(
361                Expr::col((CompatSessions::Table, CompatSessions::CreatedAt)),
362                AppSessionLookupIden::CreatedAt,
363            )
364            .expr_as(
365                Expr::col((CompatSessions::Table, CompatSessions::FinishedAt)),
366                AppSessionLookupIden::FinishedAt,
367            )
368            .expr_as(
369                Expr::col((CompatSessions::Table, CompatSessions::IsSynapseAdmin)),
370                AppSessionLookupIden::IsSynapseAdmin,
371            )
372            .expr_as(
373                Expr::col((CompatSessions::Table, CompatSessions::UserAgent)),
374                AppSessionLookupIden::UserAgent,
375            )
376            .expr_as(
377                Expr::col((CompatSessions::Table, CompatSessions::LastActiveAt)),
378                AppSessionLookupIden::LastActiveAt,
379            )
380            .expr_as(
381                Expr::col((CompatSessions::Table, CompatSessions::LastActiveIp)),
382                AppSessionLookupIden::LastActiveIp,
383            )
384            .from(CompatSessions::Table)
385            .apply_filter(compat_filter)
386            .clone();
387
388        let common_table_expression = CommonTableExpression::new()
389            .query(
390                oauth2_session_select
391                    .union(UnionType::All, compat_session_select)
392                    .clone(),
393            )
394            .table_name(Alias::new("sessions"))
395            .clone();
396
397        let with_clause = Query::with().cte(common_table_expression).clone();
398
399        let select = Query::select()
400            .column(ColumnRef::Asterisk)
401            .from(Alias::new("sessions"))
402            .generate_pagination(AppSessionLookupIden::Cursor, pagination)
403            .clone();
404
405        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
406
407        let edges: Vec<AppSessionLookup> = sqlx::query_as_with(&sql, arguments)
408            .traced()
409            .fetch_all(&mut *self.conn)
410            .await?;
411
412        let page = pagination.process(edges).try_map(TryFrom::try_from)?;
413
414        Ok(page)
415    }
416
417    #[tracing::instrument(
418        name = "db.app_session.count",
419        fields(
420            db.query.text,
421        ),
422        skip_all,
423        err,
424    )]
425    async fn count(&mut self, filter: AppSessionFilter<'_>) -> Result<usize, Self::Error> {
426        let (compat_filter, oauth2_filter) = split_filter(filter);
427        let mut oauth2_session_select = Query::select()
428            .expr(Expr::cust("1"))
429            .from(OAuth2Sessions::Table)
430            .apply_filter(oauth2_filter)
431            .clone();
432
433        let compat_session_select = Query::select()
434            .expr(Expr::cust("1"))
435            .from(CompatSessions::Table)
436            .apply_filter(compat_filter)
437            .clone();
438
439        let common_table_expression = CommonTableExpression::new()
440            .query(
441                oauth2_session_select
442                    .union(UnionType::All, compat_session_select)
443                    .clone(),
444            )
445            .table_name(Alias::new("sessions"))
446            .clone();
447
448        let with_clause = Query::with().cte(common_table_expression).clone();
449
450        let select = Query::select()
451            .expr(Expr::cust("COUNT(*)"))
452            .from(Alias::new("sessions"))
453            .clone();
454
455        let (sql, arguments) = with_clause.query(select).build_sqlx(PostgresQueryBuilder);
456
457        let count: i64 = sqlx::query_scalar_with(&sql, arguments)
458            .traced()
459            .fetch_one(&mut *self.conn)
460            .await?;
461
462        count
463            .try_into()
464            .map_err(DatabaseError::to_invalid_operation)
465    }
466
467    #[tracing::instrument(
468        name = "db.app_session.finish_sessions_to_replace_device",
469        fields(
470            db.query.text,
471            %user.id,
472            %device_id = device.as_str()
473        ),
474        skip_all,
475        err,
476    )]
477    async fn finish_sessions_to_replace_device(
478        &mut self,
479        clock: &dyn Clock,
480        user: &User,
481        device: &Device,
482    ) -> Result<(), Self::Error> {
483        // TODO need to invoke this from all the oauth2 login sites
484        let span = tracing::info_span!(
485            "db.app_session.finish_sessions_to_replace_device.compat_sessions",
486            { DB_QUERY_TEXT } = tracing::field::Empty,
487        );
488        let finished_at = clock.now();
489        sqlx::query!(
490            "
491                UPDATE compat_sessions SET finished_at = $3 WHERE user_id = $1 AND device_id = $2 AND finished_at IS NULL
492            ",
493            Uuid::from(user.id),
494            device.as_str(),
495            finished_at
496        )
497        .record(&span)
498        .execute(&mut *self.conn)
499        .instrument(span)
500        .await?;
501
502        if let Ok(device_as_scope_token) = device.to_scope_token() {
503            let span = tracing::info_span!(
504                "db.app_session.finish_sessions_to_replace_device.oauth2_sessions",
505                { DB_QUERY_TEXT } = tracing::field::Empty,
506            );
507            sqlx::query!(
508                "
509                    UPDATE oauth2_sessions SET finished_at = $3 WHERE user_id = $1 AND $2 = ANY(scope_list) AND finished_at IS NULL
510                ",
511                Uuid::from(user.id),
512                device_as_scope_token.as_str(),
513                finished_at
514            )
515            .record(&span)
516            .execute(&mut *self.conn)
517            .instrument(span)
518            .await?;
519        }
520
521        Ok(())
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use chrono::Duration;
528    use mas_data_model::Device;
529    use mas_storage::{
530        Pagination, RepositoryAccess,
531        app_session::{AppSession, AppSessionFilter},
532        clock::MockClock,
533        oauth2::OAuth2SessionRepository,
534    };
535    use oauth2_types::{
536        requests::GrantType,
537        scope::{OPENID, Scope},
538    };
539    use rand::SeedableRng;
540    use rand_chacha::ChaChaRng;
541    use sqlx::PgPool;
542
543    use crate::PgRepository;
544
545    #[sqlx::test(migrator = "crate::MIGRATOR")]
546    async fn test_app_repo(pool: PgPool) {
547        let mut rng = ChaChaRng::seed_from_u64(42);
548        let clock = MockClock::default();
549        let mut repo = PgRepository::from_pool(&pool).await.unwrap();
550
551        // Create a user
552        let user = repo
553            .user()
554            .add(&mut rng, &clock, "john".to_owned())
555            .await
556            .unwrap();
557
558        let all = AppSessionFilter::new().for_user(&user);
559        let active = all.active_only();
560        let finished = all.finished_only();
561        let pagination = Pagination::first(10);
562
563        assert_eq!(repo.app_session().count(all).await.unwrap(), 0);
564        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
565        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
566
567        let full_list = repo.app_session().list(all, pagination).await.unwrap();
568        assert!(full_list.edges.is_empty());
569        let active_list = repo.app_session().list(active, pagination).await.unwrap();
570        assert!(active_list.edges.is_empty());
571        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
572        assert!(finished_list.edges.is_empty());
573
574        // Start a compat session for that user
575        let device = Device::generate(&mut rng);
576        let compat_session = repo
577            .compat_session()
578            .add(&mut rng, &clock, &user, device.clone(), None, false, None)
579            .await
580            .unwrap();
581
582        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
583        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
584        assert_eq!(repo.app_session().count(finished).await.unwrap(), 0);
585
586        let full_list = repo.app_session().list(all, pagination).await.unwrap();
587        assert_eq!(full_list.edges.len(), 1);
588        assert_eq!(
589            full_list.edges[0],
590            AppSession::Compat(Box::new(compat_session.clone()))
591        );
592        let active_list = repo.app_session().list(active, pagination).await.unwrap();
593        assert_eq!(active_list.edges.len(), 1);
594        assert_eq!(
595            active_list.edges[0],
596            AppSession::Compat(Box::new(compat_session.clone()))
597        );
598        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
599        assert!(finished_list.edges.is_empty());
600
601        // Finish the session
602        let compat_session = repo
603            .compat_session()
604            .finish(&clock, compat_session)
605            .await
606            .unwrap();
607
608        assert_eq!(repo.app_session().count(all).await.unwrap(), 1);
609        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
610        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
611
612        let full_list = repo.app_session().list(all, pagination).await.unwrap();
613        assert_eq!(full_list.edges.len(), 1);
614        assert_eq!(
615            full_list.edges[0],
616            AppSession::Compat(Box::new(compat_session.clone()))
617        );
618        let active_list = repo.app_session().list(active, pagination).await.unwrap();
619        assert!(active_list.edges.is_empty());
620        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
621        assert_eq!(finished_list.edges.len(), 1);
622        assert_eq!(
623            finished_list.edges[0],
624            AppSession::Compat(Box::new(compat_session.clone()))
625        );
626
627        // Start an OAuth2 session
628        let client = repo
629            .oauth2_client()
630            .add(
631                &mut rng,
632                &clock,
633                vec!["https://example.com/redirect".parse().unwrap()],
634                None,
635                None,
636                None,
637                vec![GrantType::AuthorizationCode],
638                Some("First client".to_owned()),
639                Some("https://example.com/logo.png".parse().unwrap()),
640                Some("https://example.com/".parse().unwrap()),
641                Some("https://example.com/policy".parse().unwrap()),
642                Some("https://example.com/tos".parse().unwrap()),
643                Some("https://example.com/jwks.json".parse().unwrap()),
644                None,
645                None,
646                None,
647                None,
648                None,
649                Some("https://example.com/login".parse().unwrap()),
650            )
651            .await
652            .unwrap();
653
654        let device2 = Device::generate(&mut rng);
655        let scope = Scope::from_iter([OPENID, device2.to_scope_token().unwrap()]);
656
657        // We're moving the clock forward by 1 minute between each session to ensure
658        // we're getting consistent ordering in lists.
659        clock.advance(Duration::try_minutes(1).unwrap());
660
661        let oauth_session = repo
662            .oauth2_session()
663            .add(&mut rng, &clock, &client, Some(&user), None, scope)
664            .await
665            .unwrap();
666
667        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
668        assert_eq!(repo.app_session().count(active).await.unwrap(), 1);
669        assert_eq!(repo.app_session().count(finished).await.unwrap(), 1);
670
671        let full_list = repo.app_session().list(all, pagination).await.unwrap();
672        assert_eq!(full_list.edges.len(), 2);
673        assert_eq!(
674            full_list.edges[0],
675            AppSession::Compat(Box::new(compat_session.clone()))
676        );
677        assert_eq!(
678            full_list.edges[1],
679            AppSession::OAuth2(Box::new(oauth_session.clone()))
680        );
681
682        let active_list = repo.app_session().list(active, pagination).await.unwrap();
683        assert_eq!(active_list.edges.len(), 1);
684        assert_eq!(
685            active_list.edges[0],
686            AppSession::OAuth2(Box::new(oauth_session.clone()))
687        );
688
689        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
690        assert_eq!(finished_list.edges.len(), 1);
691        assert_eq!(
692            finished_list.edges[0],
693            AppSession::Compat(Box::new(compat_session.clone()))
694        );
695
696        // Finish the session
697        let oauth_session = repo
698            .oauth2_session()
699            .finish(&clock, oauth_session)
700            .await
701            .unwrap();
702
703        assert_eq!(repo.app_session().count(all).await.unwrap(), 2);
704        assert_eq!(repo.app_session().count(active).await.unwrap(), 0);
705        assert_eq!(repo.app_session().count(finished).await.unwrap(), 2);
706
707        let full_list = repo.app_session().list(all, pagination).await.unwrap();
708        assert_eq!(full_list.edges.len(), 2);
709        assert_eq!(
710            full_list.edges[0],
711            AppSession::Compat(Box::new(compat_session.clone()))
712        );
713        assert_eq!(
714            full_list.edges[1],
715            AppSession::OAuth2(Box::new(oauth_session.clone()))
716        );
717
718        let active_list = repo.app_session().list(active, pagination).await.unwrap();
719        assert!(active_list.edges.is_empty());
720
721        let finished_list = repo.app_session().list(finished, pagination).await.unwrap();
722        assert_eq!(finished_list.edges.len(), 2);
723        assert_eq!(
724            finished_list.edges[0],
725            AppSession::Compat(Box::new(compat_session.clone()))
726        );
727        assert_eq!(
728            full_list.edges[1],
729            AppSession::OAuth2(Box::new(oauth_session.clone()))
730        );
731
732        // Query by device
733        let filter = AppSessionFilter::new().for_device(&device);
734        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
735        let list = repo.app_session().list(filter, pagination).await.unwrap();
736        assert_eq!(list.edges.len(), 1);
737        assert_eq!(
738            list.edges[0],
739            AppSession::Compat(Box::new(compat_session.clone()))
740        );
741
742        let filter = AppSessionFilter::new().for_device(&device2);
743        assert_eq!(repo.app_session().count(filter).await.unwrap(), 1);
744        let list = repo.app_session().list(filter, pagination).await.unwrap();
745        assert_eq!(list.edges.len(), 1);
746        assert_eq!(
747            list.edges[0],
748            AppSession::OAuth2(Box::new(oauth_session.clone()))
749        );
750
751        // Create a second user
752        let user2 = repo
753            .user()
754            .add(&mut rng, &clock, "alice".to_owned())
755            .await
756            .unwrap();
757
758        // If we list/count for this user, we should get nothing
759        let filter = AppSessionFilter::new().for_user(&user2);
760        assert_eq!(repo.app_session().count(filter).await.unwrap(), 0);
761        let list = repo.app_session().list(filter, pagination).await.unwrap();
762        assert!(list.edges.is_empty());
763    }
764}