CNK's Blog

Hostnames and Aliases

Our multitenant Wagtail setup has made a lot if things smoother compared to the multi-instance setup it replaced. But there are a couple of situations that were easier in the old setup. For example, how do we set up a new site while the existing site is still live. Or when organizations change their names and want their url updated to the new acronym - but still want the old one to work.

Site Aliases

Our solution to these problems is twofold. First, create all sites as subdomains of the installation’s hostname; that takes care of building a new site while the current one is still live. But then we need to be able to assign the real name to the new site. To take care of that, we have a SiteAlias model to associate additional names with a site.

So if the hostname for our install is sites.example.com, then we create new sites as xyz.sites.example.com, etc. We have a wildcard DNS mapping and a wildcard SSL certificate for *.sites.example.com, so once we create a site, it is available on the public internet as https://xyz.sites.example.com. When the customer is ready for this site to be live with their preferred names, e.g. xyz.example.com, we can add the new name to our SiteAliases, request a DNS mapping and a new SSL certificate. Then the site is live with both names. Below is the code we use for mapping requests to sites.

    def match_site_to_request(request):
        """
        Find the Site object responsible for responding to this HTTP request object. Try in this order:
        * unique hostname
        * unique site alias

        If there is no matching hostname or alias for any Site, Site.DoesNotExist is raised.

        This function returns a tuple of (match_type_string, Site), where match type can be 'hostname' or 'alias'.
        It also pre-selects as much as it can from the Site and Settings, to avoid needless separate queries for things
        that will be looked at on most requests.

        This function may throw either MissingHostException or Site.DoesNotExist. Callers must handle those appropriately.
        """
        query = Site.objects.select_related('settings', 'root_page', 'features')

        if 'HTTP_HOST' not in request.META:
            # If the HTTP_HOST header is missing, this is an improperly configured test; any on-spec HTTP client will include it.
            raise MissingHostException()

        # Get the hostname. Strip off any port that might have been specified, since this function doesn't need it.
        hostname = split_domain_port(request.get_host())[0]
        try:
            # Find a Site matching this specified hostname.
            return ['hostname', query.get(hostname=hostname)]
        except Site.DoesNotExist:
            # This catches "no Site exists with this canonical hostname", now check if 'hostname' matches an alias.
            # Site.DoesNotExist will be raised if 'hostname' doesn't match an alias either
            return ['alias', query.get(settings__aliases__domain=hostname)]

So all problems solved, right? Not quite.

Preferred Domains

Now that we have multiple domain names mapped to the same site, we have an SEO problem. Ideally each site should have one and only one canonical name, so we designate one of our aliases as the preferred domain name - and then have a site middleware that redirects requests to the https version of that name.

    class MultitenantSiteMiddleware(MiddlewareMixin):

        def process_request(self, request):
            """
            Set request._wagtail_site to the Site object responsible for handling this request. Wagtail's version of this
            middleware only looks at the Sites' hostnames. Ours must also consider the Sites' lists of aliases.
            """
            try:
                # We store the Site in request._wagtail_site to avoid having to patch Wagtail's Site.find_for_request
                match_type, request._wagtail_site = match_site_to_request(request)
            except Site.DoesNotExist:
                # This will trigger if no Site matches the request. We raise a 404 so that the user gets a useful message.
                # We provide the default site as request._wagtail_site, though, just in case a template(tag) that gets
                # rendered on the 404 page expects Site.find_for_request() to actually return a Site (rather than None).
                request._wagtail_site = Site.objects.get(is_default_site=True)
                raise Http404()
            except MissingHostException:
                # If no hostname was specified, we return a 400 error. This only happens in tests.
                return HttpResponseBadRequest("No HTTP_HOST header detected. Site cannot be determined without one.")

            # Grab the site we just assigned, using the Wagtail method, to ensure the Wagtail method will work later.
            current_site = Site.find_for_request(request)

            # Determine how the user arrived here, so we can redirect them as needed.
            arrival_domain, arrival_port = split_domain_port(request.get_host())
            # If an empty port was returned from split_domain_port(), we know it's either 80 or 443.
            if not arrival_port:
                arrival_port = 80 if not request.is_secure() else 443

            # If a user visits any site via http://, and we can be 100% sure that an https-compatible version of that site
            # exists, redirect to it automatically.
            if not request.is_secure() and is_ssl_domain(arrival_domain, request):
                target_domain = get_public_domain_for_site(current_site)
                target_url = f'https://{target_domain}{request.get_full_path()}'
                logger.info(
                    'https.auto-redirect',
                    arrival_url=f'http://{arrival_domain}{request.get_full_path()}',
                    target_url=target_url
                )
                # Issue a permanent redirect, so that search engines know that the http:// URL isn't valid.
                return HttpResponsePermanentRedirect(target_url)

    def get_public_domain_for_site(site):
        """
        Returns the public-facing domain for this site
        """
        return site.settings.preferred_domain or site.hostname


    def is_ssl_domain(domain, request):
        """
        Returns True if the given domain name is guaranteed to match our SSL certs after being run through
        get_public_domain_for_site().
        """
        # We know the domain matches our SSL certs post-get_public_domain_for_site() if one of two things is true:

        # 1. The current site has a preferred_domain set. We know this implies SSL compatibility because the Site Settings
        # form prevents a non-SSL-compatible preferred_domain from being set.
        current_site = Site.find_for_request(request)
        if current_site.settings.preferred_domain:
            return True

        # 2. If the given domain matches any of our SSL wildcard domains in the way that SSL counts as a match,
        # e.g. *.example.com matches xyz.example.com but not www.xyz.example.com.
        for wildcard_domain in settings.SSL_WILDCARD_DOMAINS:
            if re.match(rf'^[^.]+\.{wildcard_domain}$', domain):
                return True

        return False

Relative urls

So now all requests are going to be going to the preferred domain right? Sadly, no. Despite all advice to use page and document choosers when creating links within a site, our content editors often copy and paste links from the browser’s address bar instead. Unfortunately, that often leads to links with the xyz.sites.example.com domain name - particularly for sites that are built while an old site is still live. So we have code that allows us to always store relative urls for links within a site. When a page is saved, a new Revision is created. Before that revision is saved, we convert any links to “our” domains into relative links and store that instead.

    @receiver(pre_save)
    def postprocess_links(sender, instance, raw, using, update_fields, **kwargs):
        """
        To ensure that copy-pasted URLs always point to the correct site, we remove the scheme and domain from any URLs
        which include the site's hostname or any of its aliases, converting them into relative URLs.
        """
        if sender == Revision:
            domains = get_domains_for_current_site()
            content_string = json.dumps(instance.content, cls=DjangoJSONEncoder)
            updated_string = domain_erase(domains, content_string)
            instance.content = json.loads(updated_string, object_hook=_decode_revision_datetimes)


    def get_domains_for_current_site():
        """
        Returns the list of domains associated with the current site. If there is no current site, returns empty list.
        """
        request = get_current_request()
        site = Site.find_for_request(request)
        alias_domains = []
        if site:
            alias_domains.append(site.hostname)
            try:
                alias_domains.extend([alias.domain for alias in site.settings.aliases.all()])
            except ObjectDoesNotExist:
                # This is a generic "except" because core can't know which settings class's DoesNotExist might get thrown.
                pass
        return alias_domains


    def domain_erase(domains, text):
        """
        Removes all instances of the specified domains from the links in the given text.
        """
        # Do nothing if given an empty list of domains. Otherwise, we'll get mangled output.
        if not domains:
            return text

        # Create a regular expression from the domains, e.g. (https?://blah\.com/?|https?://www\.blue\.com/?).
        escaped_domains = [f"https?://{re.escape(domain)}/?" for domain in domains]
        regex = '(' + "|".join(escaped_domains) + ')'
        # Replace each match with a /. This regex will convert https://www.example.com/path to /path and
        # https://www.example.com into /. Since this is just about converting local URLs, that's the appropriate conversion
        # for path-less ones.
        replaced = re.sub(regex, '/', text)
        return replaced

Running TOX locally

TL;DR Install tox and virtualenv-pyenv python packages. In the shell, export VIRTUALENV_DISCOVERY=pyenv. And in the tox.ini add the following to the setenv section: VIRTUALENV_DISCOVERY=pyenv


I needed to to do some long deferred maintenance on the wagtail-hallo package we still use. I have neglected it for so long I feel like I need to test it on several different versions of Wagtail. The package’s CI setup already runs some basic python tests for several combinations Python, Django, Wagtail, and 2 databases. So I thought I would start by running those automated tests locally - and then move onto browser testing.

So how do I run tox locally? Per usual, there is the official documentation which is thourough and overwhelming. But this one pager from the OpenAstronomy Python Packaging Guide was much more what I needed to get started. That explained the structure of my existing tox.ini file and showed me how to run individual test environments - or all of them. I didn’t want to have to set up postgres so the first thing I did was to remove postgres from the database options - leaving only the sqlite environments. Then I did pip install tox into the virtual environment I use for developing Wagtail - currently Python 3.12.5, Django 5.0, and wagtail plus its dependencies from sha a5761bc2a961a8c91e5482d2e301191f617fe3d4.

The first time I ran tox, some of the sets ran but most of them said they couldn’t find an appropriate python - even for pythons I know I have installed in my pyenv setup. ChatGPT told me I needed to install tox-pyenv but when I looked at its GitHub README, that project is archived and it tells me I need a different package: virtualenv-pyenv.

After a little bit of searching, I found virtualenv-pyenv and was able to install it: pip install virtualenv-pyenv. I added the required environment variable to my bash profile: VIRTUALENV_DISCOVERY=pyenv and then edited the tox.ini file to add a new environment variable:

    setenv =
        postgres: DATABASE_URL={env:DATABASE_URL:postgres:///wagtail_hallo}
        VIRTUALENV_DISCOVERY=pyenv

So at this point, I have the following tox-related packages in my virtual environment:

   tox==4.23.2
   virtualenv==20.28.0
   virtualenv-pyenv==0.5.0

And now when I ran tox again, most of my tests ran:

    python3.8-django3.2-wagtail3.0-sqlite: SKIP (0.01 seconds)
    python3.9-django3.2-wagtail3.0-sqlite: SKIP (0.00 seconds)
    python3.10-django3.2-wagtail3.0-sqlite: OK (15.19=setup[10.32]+cmd[4.87] seconds)
    python3.9-django4.1-wagtail4.2-sqlite: SKIP (0.00 seconds)
    python3.10-django4.1-wagtail4.2-sqlite: OK (15.41=setup[8.17]+cmd[7.25] seconds)
    python3.11-django4.1-wagtail4.2-sqlite: OK (15.46=setup[8.02]+cmd[7.44] seconds)
    python3.10-django4.2-wagtail5.2-sqlite: OK (14.53=setup[7.81]+cmd[6.72] seconds)
    python3.10-django4.2-wagtail6.1-sqlite: OK (14.67=setup[7.50]+cmd[7.17] seconds)
    python3.10-django5.0-wagtail5.2-sqlite: OK (14.43=setup[7.42]+cmd[7.01] seconds)
    python3.10-django5.0-wagtail6.1-sqlite: OK (15.05=setup[7.38]+cmd[7.67] seconds)
    python3.11-django4.2-wagtail5.2-sqlite: OK (14.03=setup[7.36]+cmd[6.66] seconds)
    python3.11-django4.2-wagtail6.1-sqlite: OK (14.32=setup[7.08]+cmd[7.24] seconds)
    python3.11-django5.0-wagtail5.2-sqlite: OK (14.19=setup[7.07]+cmd[7.12] seconds)
    python3.11-django5.0-wagtail6.1-sqlite: OK (14.74=setup[7.11]+cmd[7.63] seconds)
    python3.12-django4.2-wagtail5.2-sqlite: OK (6.48=setup[0.01]+cmd[6.47] seconds)
    python3.12-django4.2-wagtail6.1-sqlite: OK (6.81=setup[0.01]+cmd[6.81] seconds)
    python3.12-django5.0-wagtail5.2-sqlite: OK (6.67=setup[0.01]+cmd[6.66] seconds)
    python3.12-django5.0-wagtail6.1-sqlite: OK (7.46=setup[0.01]+cmd[7.45] seconds)
    congratulations :) (189.50 seconds)

Locally I don’t care about python 3.8 and 3.9, so I am just going to ignore them. The messages in the console appear to indicate that pyenv doesn’t have packages for those older pythons:

    skipped because could not find python interpreter with spec(s): python3.9
    only CPython is currently supported

Reports and Filters

Most of the heavy lifting for our multitenancy changes is taken care of by our permission patches. But there are still a few places where we need to filter items by site - or remove an explicit site filter.

Page Explorer Filters

Wagtail 6.0, introduced “Universal Listings”, a way to combine full text search with a series of filters to hone in on the content you want to edit. One of the included filters is a filter for the site - but we never want someone navigating between sites even if they have permissions to edit more than one site. So we’ll want to remove the site filter. We also need to pare down the filters that offer you a list of users. Out of the box, these filters will list everyone who has edited a page, or has unlock permission across the entire installation. We need to limit these filters to only users who belong to one of the current site’s groups.

As discussed in Monkey Patching Wagtail, to patch a filter, we need to alter the view that uses it. The filterset_class is an attribute of the index view and the easiest way to alter the view is to subclass it and then map your subclass to the same url as the original view. Let me give you the code snippets in the opposite direction. Starting from the url and working our way down through the view to the filters.

    # patched_urls.py
    from .views.page_explorer import MultitenantPageIndexView

    patched_wagtail_urlpatterns = [
        # This overrides the wagtailadmin_explore_page (aka page listing view) so we can monkey patch the filters
        path('admin/pages/', MultitenantPageIndexView.as_view()),
        path('admin/pages/<int:parent_page_id>/', MultitenantPageIndexView.as_view()),
    ]

In MultitenantPageIndexView, you can override whatever you need to to change the PageExplorer. Our permission patches take care of limiting the pages to the current site, so the only thing we need to change is the filters. This is done by setting the filterset_class attribute.

    # wagtail_patches/views/page_explorer.py
    from wagtail.admin.views.pages.listing import IndexView

    class MultitenantPageIndexView(IndexView):
        filterset_class = MultitenantPageFilterSet

OK now, finally, let’s mess with the filters. If we were only tweaking one thing, it might be easier to subclass the existing PageFilterSet and change a specific method. But given the number of changes, including removing the site attribute, I thought it was clearer to just copy the PageFilterSet logic into my function and then alter it.

Our init method pulls up some of the “infer the base queryset” information from django_filters to enforce starting with pages from this site. Then I patched the 2 filters that provide a list of users who have performed some action. And finally, I omitted the site filter all together.

    # wagtail_patches/views/page_explorer.py
    class MultitenantPageFilterSet(WagtailFilterSet):
        def __init__(self, data=None, queryset=None, *, request=None, prefix=None):
            # BEGIN PATCH/Override
            request = request or get_current_request()
            # If we weren't sent the request, and couldn't get it from the middleware, we return nothing.
            if not request:
                queryset = Page.objects.none()
            else:
                root_path = Site.find_for_request(request).root_page.path
                queryset = Page.objects.filter(path__startswith=root_path)
            # END PATCH
            super().__init__(data, queryset, request=request, prefix=prefix)

        content_type = MultipleContentTypeFilter(
            label=_("Page type"),
            queryset=lambda request: get_page_content_types_for_theme(request, include_base_page_type=False),
            widget=CheckboxSelectMultiple,
        )
        latest_revision_created_at = DateFromToRangeFilter(
            label=_("Date updated"),
            widget=DateRangePickerWidget,
        )
        owner = MultipleUserFilter(
            label=_("Owner"),
            queryset=(
                lambda request: get_user_model().objects.filter(
                    # BEGIN PATCH
                    pk__in=Page.objects.descendant_of(Site.find_for_request(request).root_page)
                    # END PATCH
                    .values_list("owner_id", flat=True)
                    .distinct()
                )
            ),
            widget=CheckboxSelectMultiple,
        )
        edited_by = EditedByFilter(
            label=_("Edited by"),
            queryset=(
                lambda request: get_user_model().objects.filter(
                    pk__in=PageLogEntry.objects
                    # BEGIN PATCH
                    .filter(page__path__startswith=Site.find_for_request(request).root_page.path, action="wagtail.edit")
                    # END PATCH
                    .order_by()
                    .values_list("user_id", flat=True)
                    .distinct()
                )
            ),
            widget=CheckboxSelectMultiple,
        )
        has_child_pages = HasChildPagesFilter(
            label=_("Has child pages"),
            empty_label=_("Any"),
            choices=[
                ("true", _("Yes")),
                ("false", _("No")),
            ],
            widget=RadioSelect,
        )

        class Meta:
            model = Page
            fields = []  # only needed for filters being generated automatically

Reports

In a number of cases, we need to make similar patches to our report views - to limit the pages or users offered in the filter widget. Here is our current set of overridden report views:

    # patched_urls.py
    # Inside the urlpatterns list... these override the wagtailadmin_reports:* views.
    path('admin/reports/locked/', MultitenantLockedPagesView.as_view()),
    # two views related to page type use
    path('admin/reports/page-types-usage/', MultitenantPageTypesUsageReportView.as_view()),
    path('admin/pages/usage/<slug:content_type_app_name>/<slug:content_type_model_name>/',
         MultitenantContentTypeUseView.as_view()),
    path('admin/reports/site-history/', MultitenantSiteHistoryView.as_view()),
    # This overrides the wagtailadmin_pages:history view.
    path('admin/pages/<int:page_id>/history/', MultitenantPageHistoryView.as_view()),

Locked Pages

The locked pages report needed 2 changes - the first to remove instances the user may have permission to edit but which is not in the current site. The second one customizes the filter to restrict the list of users displayed in the filter to only those who have locked pages on this site.

    # wagtail_patches/views/reports/locked_pages.py
    def site_specific_get_users_for_filter():
        """
        Only show users who have locked pages on the current Site.
        """
        request = get_current_request()
        # If we weren't sent the request, and couldn't get it from the middleware, we have to give up and return nothing.
        if not request:
            return get_user_model().objects.none()

        site = Site.find_for_request(request)
        User = get_user_model()
        return User.objects.filter(
            locked_pages__isnull=False,
            groups__name__startswith=site.hostname
        ).distinct().order_by(User.USERNAME_FIELD)


    class MultitenantLockedPagesReportFilterSet(LockedPagesReportFilterSet):
        locked_by = django_filters.ModelChoiceFilter(
            field_name="locked_by", queryset=lambda request: site_specific_get_users_for_filter()
        )


    class MultitenantLockedPagesView(LockedPagesView):
        filterset_class = MultitenantLockedPagesReportFilterSet

        def get_queryset(self):
            # BEGIN PATCH
            # The original had an "OR locked by you" that we needed to get rid of
            pages = (
                PagePermissionPolicy().instances_user_has_permission_for(
                    self.request.user, "change"
                ).filter(locked=True)
                .specific(defer=True)
            )

            self.queryset = pages
            # Skip Wagtail's version of LockedPagesView and go to its parent PageReportView
            return super(LockedPagesView, self).get_queryset()
            # END PATCH

Page Type Usage

There is a new PageTypes report that arrived in Wagtail 6.0. We need to restrict its counts to the pages on this site. This view only needed the base queryset changed but I also wanted to remove the site list (we don’t want to leak that information to owners of other sites). And we are don’t use internationalization, so I wanted to get rid of the filters completely. The way I did this was kind of hacky. If I only set the filterset_class to None, it was still getting called - so I monkey patched all of the report’s get_queryset and removed the part that was calling for the existing queryset to be filtered by our useless site and local options.

    #  wagtail_patches/views/reports/page_usage.py
    class MultitenantPageTypesUsageReportView(PageTypesUsageReportView):
        # BEGIN PATCH
        filterset_class = None
        # END PATCH

        def get_queryset(self):
            # BEGIN PATCH
            page_models = get_page_models_for_theme(self.request)
            queryset = ContentType.objects.filter(
                model__in=[model.__name__.lower() for model in page_models]
            ).all()

            # Removed code for multisite support and removed locale & site filters
            # Cheat and hard-code filter values for locale and site to search the current site only.
            language_code = None
            site_root_path = Site.find_for_request(self.request).root_page.path
            # END PATCH

            queryset = _annotate_last_edit_info(queryset, language_code, site_root_path)

            queryset = queryset.order_by("-count", "app_label", "model")

            return queryset


    class MultitenantContentTypeUseView(ContentTypeUseView):

        def get_queryset(self):
            # BEGIN PATCH
            root_page = Site.find_for_request(self.request).root_page
            return self.page_class.objects.descendant_of(root_page, inclusive=True).all().specific(defer=True)
            # END PATCH

Site History report

Wagtail’s site history report tracks changes for all pages and snippet models. Per usual, we only want to show information for the current site. It is relatively easy to do this for pages - we can filter using the page tree. In theory we could also filter snippet model information using the site_id but that would involve writing a gigantic query that joined the model logging table to all the model tables. That isn’t feasible so we only show model history to superusers who can see all of the information anyway.

    def site_specific_base_viewable_by_user(self, user):
        if user.is_superuser:
            return self.all()
        else:
            return self.none()
    from wagtail.models import BaseLogEntryManager  # noqa
    BaseLogEntryManager.viewable_by_user = site_specific_base_viewable_by_user

The site history page has a filter by content types, so we need to remove all the non-page models for normal users.

    class MultitenantSiteHistoryView(LogEntriesView):
        """
        We force this view to be used in place of LogEntriesView through the use of wagtail_patches/patched_urls.py.
        """
        filterset_class = MultitenantSiteHistoryReportFilterSet


    class MultitenantSiteHistoryReportFilterSet(SiteHistoryReportFilterSet):
        user = django_filters.ModelChoiceFilter(field_name='user', queryset=site_specific_users_who_have_edited_pages)
        object_type = ContentTypeFilter(
            label='Type',
            method='filter_object_type',
            queryset=lambda request: site_specific_get_content_types_for_filter(),
        )


    def site_specific_get_content_types_for_filter():
        """
        This is a tweaked version of wagtail.admin.views.reports.audit_logging.get_content_types_for_filter() that
        only returns Page content types, unless the user is a Superuser, and thus allowed to edit Snippets directly.
        """
        content_type_ids = set()
        for log_model in registry.get_log_entry_models():
            request = get_current_request()
            if log_model.__name__ == 'PageLogEntry' or (request and request.user.is_superuser):
                content_type_ids.update(log_model.objects.all().get_content_type_ids())

        return ContentType.objects.filter(pk__in=content_type_ids).order_by('model')

    def site_specific_users_who_have_edited_pages(request):
        """
        Only show users who have modified pages on the current Site.
        """
        request = request or get_current_request()
        # If we weren't sent the request, and couldn't get it from the middleware, we have to give up and return nothing.
        if not request:
            return get_user_model().objects.none()

        root_path = Site.find_for_request(request).root_page.path
        user_pks = set(PageLogEntry.objects.filter(page__path__startswith=root_path).values_list('user__pk', flat=True))
        return get_user_model().objects.filter(pk__in=user_pks).order_by('last_name')

Page History view

History information for pages is also available from a link on the edit form sidebar and in the page listing. To make sure that is only allowing history for pages on the current site, we replaced the get_object_or_404 with equivalent code that checks the page belongs to the site. And we patched the filters to use the same user query as above.

    class MultitenantPageHistoryView(PageHistoryView):
        """
        This subclass reports the Page history for only the current Site, rather than the entire server.
        We force this view to be used in place of PageHistoryView through the use of wagtail_patches/patched_urls.py.
        """
        filterset_class = MultitenantPageHistoryReportFilterSet

        @method_decorator(user_passes_test(user_has_any_page_permission))
        def dispatch(self, request, *args, **kwargs):
            # BEGIN PATCH
            # Unwrap get_object_or_404 so we can adjust the query
            root_page = Site.find_for_request(request).root_page
            page = Page.objects.filter(pk=kwargs['page_id']).descendant_of(root_page, inclusive=True).first()
            if page:
                self.page = page.specific
            else:
                raise Http404("No page matches the given query.")
            # END PATCH

            return super(PageHistoryView, self).dispatch(request, *args, **kwargs)

    class MultitenantPageHistoryReportFilterSet(PageHistoryReportFilterSet):
        # This class lets us redefine user's 'queryset' callable to the same one as MultitenantSiteHistoryReportFilterSet.
        user = django_filters.ModelChoiceFilter(field_name='user', queryset=site_specific_users_who_have_edited_pages)

Snippet Choosers

Continuing with our Location snippet from our previous post, we want to use locations in our event pages. So we need to be able to choose locations - but only locations entered into the current site - and we need to enforce the “same site” restriction in our foreign key relationships. Fortunately Django already supports using functions to create a list of valid options for choosers. So in our case, we need a function that does not take any arguments and returns the dictionary Django needs to build the queryset filter. See the Django docs for details.

Relationships

Our EventPage has a foreign key relationship with Location and we use a helper method to restrict the choices offered to locations in the same site. The help looks like this:

    def limit_to_current_site():
        """
        Use this function to limit a dropdown that lists models that reference a Site to those instances that reference
        the current Site.
        """
        request = get_current_request()
        if request:
            return {'site': Site.find_for_request(request)}
        else:
            # If we do not have a request, rely on the validations that ran when inserting this data.
            # NB: Our imports must be sure they are setting up the foreign key relations to data in the current site.
            return Q()

And then we use it in our page model definition like this.

    class EventPage(Page):
        start_date = models.DateTimeField('Start Date/Time')
        end_date = models.DateTimeField('End Date/Time')
        location = models.ForeignKey(
            'map.Location',
            blank=True,
            null=True,
            limit_choices_to=limit_to_current_site,
            on_delete=models.SET_NULL,
        )
        description = RichTextField(editor='minimal', blank=True)

        content_panels = Page.content_panels + [
                FieldRowPanel(
                    classname='datetimes-field date-field',
                    children=[
                        FieldPanel('start_date', classname='start-date'),
                        FieldPanel('end_date', classname='end-date')
                    ]
                ),
                FieldPanel(
                    'location',
                    widget=autocomplete.ModelSelect2(
                        url='map:location_autocomplete',
                        attrs={'data-placeholder': 'Search for Locations...'}
                    )
                ),
                FieldPanel('description'),
        ]

Autocomplete views

You will notice that we have specified a widget in the location FieldPanel. This is because we have too many locations in some sites to easily use a <select> input field. The code above enforces the site restriction for the foreign key relationship but we will need a custom view to allow editors to search for appropriate locations.

    # views.py
    from dal import autocomplete

    class LocationAutocompleteView(autocomplete.Select2QuerySetView):
        """
        An autocompleter that returns Location objects for use in the various forms.
        """
        paginate_by = None

        def get_queryset(self):
            # Start with all of the Locations for the site.
            site = Site.find_for_request(self.request)
            queryset = Location.objects.filter(site=site)

            # If the user has typed anything into the autocomplete widget, filter the queryset down to Locations that match.
            if self.q:
                queryset = queryset.filter(name__icontains=self.q)

            return queryset

    # Then in our urls.py we have this line to add the url
    path('location_autocomplete', never_cache(views.LocationAutocompleteView.as_view()), name='location_autocomplete'),
    # Then this url mapping is used as the `url` arg for the autocomplete widget in the form above.

Choosers

Wagtail snippets also provide chooser views to select instances of a model or create an instance if a suitable one does not already exist. Once again, we need to only offer instances from the current site to be associated with other models on the site. We are currently using the older wagtail-generic-chooser package so we created a mixin to take care of filtering by site.

I’ll update this code once we have converted to using the built-in ChooserViewSet. I think we should be able to subclass ChooserViewSet, customize get_object_list, and then follow the rest of the instructions but I haven’t tried it yet.

    # utils.py
    from generic_chooser.views import ModelChooserMixin, ModelChooserViewSet

    class SiteSpecificChooserMixin(ModelChooserMixin):
        """
        Use this ChooserMixin for Site-specific models, to ensure that users can only choose instances of that model
        belonging to the current Site.
        """

        def get_unfiltered_object_list(self):
            objects = super().get_unfiltered_object_list()
            return objects.filter(site=Site.find_for_request(self.request))

    # views.py
    class LocationChooserViewSet(ModelChooserViewSet):
    """
    This viewset defines the views that ae used to choose (and create, from within the chooser) Location objects.
    To make use of them, you must specify widget=LocationChooser in your FieldPanel for the Location field, or use an
    LocationChooserBlock in your StreamField block definition.
    """
    icon = 'map'
    model = Location
    page_title = 'Choose a Location'
    per_page = 40
    order_by = 'name'
    form_class = LocationModelForm
    chooser_mixin_class = SiteSpecificChooserMixin

    # forms.py
    class LocationModelForm(SiteSpecificModelForm):
    """
    wagtail-generic-choosers expects an _actual_ ModelForm, rather than a pseudo-ModelForm that Wagtail lets
    you use. This Form class manually specifies the Model it's for and the fields it presents, because that's the
    default way that it works in Django, and wagtail-generic-choosers expects that.
    """

    class Meta:
        model = Location
        fields = [ 'name', 'building_name', 'room_number']

Snippets - CRUD

In addition to the models that Wagtail provides (pages, images, and documents), most web sites also need other models. The easiest way to manage those in the Wagtail admin is to register them as “snippets”. Like all other assets in our multitenant install, we only want people to manage the snippet instances for their own site.

Example

We have maps on our sites and we store the data for map locations via a Django model. We use a SnippetViewSet to manage locations in the Wagtail admin interface. These same locations are also used by our EventPages as the location for the event - so we also need site-specific choosers to associate events and locations with instances on the same site.

Models

    # models.py
    class Location(Orderable, models.Model):
        """
        Represents a location at which an Event can take place.
        """
        name = models.CharField('Location Name', max_length=1024)
        building_name = models.CharField('Building Name', max_length=255, blank=True)
        room_number = models.CharField('Room Number', max_length=255, blank=True)

        # Associate each Location with a particular Site, so that editing Locations on one Site
        # doesn't affect other Sites.
        site = models.ForeignKey(
            Site,
            related_name='locations',
            on_delete=models.CASCADE
        )

        class Meta:
            ordering = ['name']

        def __str__(self):
            return self.name

The only thing that makes this model special is that we have a ForeignKey relationship with the Wagtail Sites table.

Views / ViewSets

Because we have a a bunch of site-specific models, we have a couple of view-level classes that help us manage the site segregation. In the code below, note that we inherit from a custom ViewSet and that while our panels list doesn’t include the site_id, we are using SiteSpecificModelForm as the base class for our form.

    # wagtail_hooks.py
    class LocationViewSet(MultitenantSnippetViewSet):
        """
        This class defines the SnippetViewSet for Location, which is accessed from the Map menu defined below.
        """
        model = Location
        menu_label = 'Locations'
        menu_order = 100
        list_display = ['name', 'building_name', 'room_number']
        search_fields = ['name', 'building_name', 'room_number']
        icon = 'location-arrow'
        url_prefix = 'map/locations'

        panels = [
            MultiFieldPanel(
                heading='Location',
                children=[
                    FieldPanel('name'),
                    FieldPanel('building_name'),
                    FieldPanel('room_number'),
                ]
            )
        ]
        edit_handler = ObjectList(panels, base_form_class=SiteSpecificModelForm)

Our MultitenantSnippetViewSet takes care of adding a filter so the listing view for each site only displays items for that one site. It also has some code that makes it easier to manage whether or not to add a menu item for managing the model.

    class MultitenantSnippetViewSet(SnippetViewSet):
        """
        We subclass SnippetViewSet to apply some functionality that nearly all our of SnippetViewSets need, and to
        simplify some other functionality.
        """

        def get_queryset(self, request):
            """
            Every model that uses MultitenantSnippetViewSet is a Site-specific model, so we need to filter the listing
            to show only those instances that belong to the current Site.
            """
            return self.model._default_manager.filter(site=Site.find_for_request(request))

        def hide_menu_item(self, request):  # noqa
            """
            Override hide_menu_item() to return True when the menu item for this SnippetViewSet should be hidden.
            The logic for this is combined with the permissions-based display logic that's built in to SnippetViewSet.
            This gets called by the is_shown() method in CustomMenuItem, defined inside get_menu_item() below.
            """
            return False

        def get_menu_item(self, order=None):
            """
            We override this method to apply custom is_shown() logic to this ViewSet's menu item.
            """
            # We subclass self.menu_item_class, which implements permissions checking in is_shown(), so that our code can
            # call super().is_shown() to get the "default" display permissions. We do that after determining if we need to
            # be even more strict than that for this class, via hide_menu_item().
            class CustomMenuItem(self.menu_item_class):
                # Assigning CustomMenuItem.hide_menu_item to MultitenantSnippetViewSet.hide_menu_item lets
                # CustomMenuItem.is_shown() access hide_menu_item() as a normal instance method. This will work even for
                # overriden versions of hide_menu_item() in subclasses of MultitenantSnippetViewSet.
                hide_menu_item = self.hide_menu_item

                def is_shown(self, request):
                    """
                    If self.hide_menu_item() returns True, hide this menu item.
                    Otherwise, permissions control its visibility.
                    """
                    if self.hide_menu_item(request):
                        return False
                    return super().is_shown(request)

            return CustomMenuItem(
                label=self.menu_label,
                url=self.menu_url,
                name=self.menu_name,
                icon_name=self.menu_icon,
                order=order or self.menu_order,
            )

NOTE: Until GitHub issue 10746 is resolved, the filter in get_queryset only limits access for the list view; it does not prevent someone from accessing the edit view for an item on a different site. You might want to subclass the SnippetEditView so you can customize get_object. That would allow you to return a 404 page when the user tries to edit an object from another site. We didn’t do this. Instead we enforce ‘only edit on the correct site’ in the clean method of our SiteSpecificModelForm. This form class adds methods for ensuring items are created and edited on the site to which they belong. Instead of putting the site_id in the model form, we fill it in automatically when creating a model object - and then refuse to allow anything to move the object to a different site.

    # utils.py
    class SiteSpecificModelForm(WagtailAdminModelForm):
        """
        Generic form for use on models administered via Wagtail forms that need to generate site-specific objects.

        NOTE: The model's 'panels' list must NOT contain the 'site' field.
        """

        def clean(self):
            cleaned_data = super().clean()

            current_site = Site.find_for_request(get_current_request2('Current Site'))
            try:
                if self.instance and self.instance.site and self.instance.site != current_site:
                    raise ValidationError(
                        f'The Site associated with this {self.instance.__class__.__name__} is {self.instance.site}, but the'
                        f'current Site is {current_site}. Changing the Site of an existing object is not allowed.'
                    )
            except ObjectDoesNotExist:
                # We're in a create, so self.instance.site does not resolve.
                pass

            # If the model has a unique_together constraint that includes the site field, we need to implement the
            # validation for it here, since our shenanigans with that field break django's usual validation code.
            if hasattr(self._meta.model._meta, 'unique_together'):
                # unique_together gets stored as a tuple of tuples, so we need this outer loop to get to the field list.
                for constraint in self._meta.model._meta.unique_together:
                    if 'site' not in constraint:
                        continue
                    # Build a dict of args for the QuerySet.filter() method, using current_site for the 'site' arg.
                    filter_args = {field_name: cleaned_data.get(field_name) for field_name in constraint}
                    filter_args['site'] = current_site
                    # Check if an instance already exists with the unique_together data, and if so, set an error if that
                    # instance ISN'T the one that's currently being edited.
                    instance_in_db = self._meta.model.objects.filter(**filter_args).first()
                    if instance_in_db and instance_in_db != self.instance:
                        for field in [x for x in constraint if x != 'site']:
                            self.add_error(
                                field, ValidationError(f'A {self._meta.model.__name__} already exists with that {field}.')
                            )

            return cleaned_data

        def save(self, commit=True):
            instance = super().save(False)

            if not instance.site_id:
                # This is an instance that's being created for the first time, so we need to give it the current site.
                # Future updates can't change the site field, because it's not in the form.
                instance.site = Site.find_for_request(get_current_request2('Current Site'))

            # Some subclasses override this to do additional processing, if they do, they need to call super().save(False)
            if commit:
                instance.save()
                self.save_m2m()
            return instance

Snippets Index View

The snippets index view shows counts of objects of each type. We will need those counts scoped to the current site. As discussed in Monkey Patching Wagtail, I subclass the SnippetIndexView and replace the query for snippets with a version of our own. Now the counts on the index page match the number of items on the model index pages.

    patched_url_patterns = [...
        path('admin/snippets/', MultitenantModelIndexView.as_view()),
    ]

    # views/snippets.py
    from wagtail.snippets.views.snippets import ModelIndexView

    class MultitenantModelIndexView(ModelIndexView):
        def _get_snippet_types(self):
            """
            Override this to restrict the model counts to items in the current site
            """
            current_site = Site.find_for_request(get_current_request2('SnippetsView'))
            return [
                {
                    "name": capfirst(model._meta.verbose_name_plural),
                    "count": model.objects.filter(site=current_site).all().count() ,  # PATCH
                    "model": model,
                }
                for model in get_snippet_models()
                if user_can_edit_snippet_type(self.request.user, model)
            ]

Permissions

The changes above are the customizations we have made to the Views and ViewSets. What I didn’t mention in my previous post about permission patches is that we also need to make sure we are only checking the model permissions assigned via groups that are used on the current site. This is done by customizing the _get_group_permissions from our Authentication backend. We subclass the ModelBackend from django.contrib.auth.backends and filter Permissions for groups named for the current site.

    # custom_auth/backends.py
    def _get_group_permissions(self, user_obj):
        """
        By default, Django's permission system assumes that if you are granted a Permission by ANY Group, you have that
        permission in all contexts. We override this method to ensure that a User is ONLY granted Permissions from the
        Groups they belong to on the current Site.
        """
        request = get_current_request2(f"{user_obj.username}'s Group permissions")
        if user_obj.is_superadmin:
            # Super Admins are treated as being members of the current Site's Admins group.
            return Permission.objects.filter(group__name=f'{Site.find_for_request(request).hostname} Admins')
        else:
            # Other users are treated as having only the permissions granted to them by Groups they belong to on
            # the current Site.
            return Permission.objects.filter(
                group__user=user_obj, group__name__startswith=Site.find_for_request(request).hostname
            )