from datetime import date, datetime, timedelta from urllib.parse import urlencode from django.conf import settings from django.core.paginator import EmptyPage, PageNotAnInteger, Paginator from django.db.models import Avg, Count, F, Max, Q, Sum from django.db.models.expressions import Func from django.db.models.functions import ( Coalesce, Length, TruncDay, TruncMonth, TruncWeek, ) from django.http import Http404, JsonResponse from django.shortcuts import get_object_or_404, redirect, render from django.utils import timezone from main import models def index(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") return render(request, "main/moderation_index.html") def user_cards(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") user_list = ( models.User.objects.annotate(count=Count("post")) .filter(count__gt=0, is_approved=False) .order_by("?") ) user = user_list.first() post_list = models.Post.objects.filter(owner=user) post_list_halfpoint = post_list.count() // 2 if post_list.count() > 1 else 1 post_list_a = post_list[:post_list_halfpoint] post_list_b = post_list[post_list_halfpoint:] return render( request, "main/moderation_user_single.html", { "user": user, "user_count": user_list.count(), "post_list_a": post_list_a, "post_list_b": post_list_b, "TRANSLATE_API_URL": settings.TRANSLATE_API_URL, "TRANSLATE_API_TOKEN": settings.TRANSLATE_API_TOKEN, "DEBUG": "true" if settings.DEBUG else "false", }, ) def user_list(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") # build base queryset user_qs = models.User.objects.all() # handle filters via mode param mode_param = request.GET.get("mode") if mode_param: mode = mode_param.split(",") if "noapprove" in mode: user_qs = user_qs.filter(is_approved=False) if "noempty" in mode: user_qs = user_qs.annotate(count=Count("post")).filter(count__gt=0) if "premium" in mode: user_qs = user_qs.filter(is_premium=True) # ordering if "reverse" in mode: user_qs = user_qs.order_by("id") else: user_qs = user_qs.order_by("-id") else: # default ordering for simple case user_qs = user_qs.order_by("-id") current_modes = mode if mode_param else [] # prefetch posts for listed users user_qs = user_qs.prefetch_related("post_set") # pagination per_page_default = 100 try: per_page = int(request.GET.get("per_page", per_page_default)) except (TypeError, ValueError): per_page = per_page_default paginator = Paginator(user_qs, per_page) page_number = request.GET.get("page") try: page_obj = paginator.page(page_number) except PageNotAnInteger: page_obj = paginator.page(1) except EmptyPage: page_obj = paginator.page(paginator.num_pages) # preserve existing non-page query params in pagination links params = request.GET.copy() params.pop("page", None) querystring = params.urlencode() # Build clickable filter links def link_for_modes(modes_list: list[str]) -> str: query: dict[str, str] = {} # preserve per_page if per_page != per_page_default: query["per_page"] = str(per_page) if modes_list: query["mode"] = ",".join(modes_list) return f"?{urlencode(query)}" if query else "?" all_filter_keys = ["noapprove", "noempty", "premium", "reverse"] filters = [] for key in all_filter_keys: is_active = key in current_modes if is_active: new_modes = [m for m in current_modes if m != key] else: new_modes = current_modes + [key] filters.append( { "key": key, "active": is_active, "url": link_for_modes(new_modes), } ) clear_filters_url = link_for_modes([]) return render( request, "main/moderation_user_list.html", { "page_obj": page_obj, "paginator": paginator, "is_paginated": paginator.num_pages > 1, "user_list": page_obj.object_list, "querystring": querystring, "filters": filters, "clear_filters_url": clear_filters_url, "TRANSLATE_API_URL": settings.TRANSLATE_API_URL, "TRANSLATE_API_TOKEN": settings.TRANSLATE_API_TOKEN, "DEBUG": "true" if settings.DEBUG else "false", }, ) def user_delete(request, user_id): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() user = get_object_or_404(models.User, id=user_id) if request.method == "POST": user.delete() return JsonResponse({"ok": True}) raise Http404() def user_approve(request, user_id): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() user = get_object_or_404(models.User, id=user_id) if request.method == "POST": user.is_approved = True user.save() return JsonResponse({"ok": True}) raise Http404() def user_unapprove(request, user_id): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() user = get_object_or_404(models.User, id=user_id) if request.method == "POST": user.is_approved = False user.save() return JsonResponse({"ok": True}) raise Http404() def images_leaderboard(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") # determine sort mode from querystring similar to moderation users page mode_param = request.GET.get("mode") current_modes: list[str] = mode_param.split(",") if mode_param else [] sort_by_mb = "bymb" in current_modes reverse = "reverse" in current_modes users_with_counts = models.User.objects.annotate( image_count=Count("image"), image_bytes=Sum(Func(F("image__data"), function="octet_length")), ).filter(image_count__gt=0) if sort_by_mb: ordering = ["image_bytes", "id"] if reverse else ["-image_bytes", "-id"] else: ordering = ["image_count", "id"] if reverse else ["-image_count", "-id"] users_with_counts = users_with_counts.order_by(*ordering) # Keep it lightweight; show top 200 by default top_limit = 200 user_list = list(users_with_counts[:top_limit]) for u in user_list: total_bytes = u.image_bytes or 0 u.image_megabytes = round(total_bytes / (1024 * 1024), 2) # build filter links def link_for_modes(modes_list: list[str]) -> str: query: dict[str, str] = {} if modes_list: query["mode"] = ",".join(modes_list) from urllib.parse import urlencode as _urlencode # local alias return f"?{_urlencode(query)}" if query else "?" # build filters ensuring only one of byimages/bymb is active at once filters: list[dict[str, str | bool]] = [] filters.append( { "key": "byimages", "active": "byimages" in current_modes or (not current_modes), "url": link_for_modes( [m for m in current_modes if m not in ["byimages", "bymb"]] + ["byimages"] ), } ) # bymb filters.append( { "key": "bymb", "active": "bymb" in current_modes, "url": link_for_modes( [m for m in current_modes if m not in ["byimages", "bymb"]] + ["bymb"] ), } ) # reverse toggle filters.append( { "key": "reverse", "active": "reverse" in current_modes, "url": link_for_modes( [m for m in current_modes if m != "reverse"] if "reverse" in current_modes else current_modes + ["reverse"] ), } ) clear_filters_url = link_for_modes([]) return render( request, "main/moderation_images.html", { "user_list": user_list, "top_limit": top_limit, "filters": filters, "clear_filters_url": clear_filters_url, }, ) def posts_leaderboard(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") # sort modes mode_param = request.GET.get("mode") current_modes: list[str] = mode_param.split(",") if mode_param else [] # default sort is by total posts desc sort_key = "byposts" if "bypublished" in current_modes: sort_key = "bypublished" elif "bydrafts" in current_modes: sort_key = "bydrafts" reverse = "reverse" in current_modes users_with_post_stats = models.User.objects.annotate( posts_total=Count("post"), posts_published=Count("post", filter=Q(post__published_at__isnull=False)), posts_drafts=Count("post", filter=Q(post__published_at__isnull=True)), last_post_date=Max("post__published_at"), ).filter(posts_total__gt=0) if sort_key == "bypublished": ordering = ["posts_published", "id"] if reverse else ["-posts_published", "-id"] elif sort_key == "bydrafts": ordering = ["posts_drafts", "id"] if reverse else ["-posts_drafts", "-id"] else: # byposts ordering = ["posts_total", "id"] if reverse else ["-posts_total", "-id"] users_with_post_stats = users_with_post_stats.order_by(*ordering) # limit (configurable via ?limit=) default_limit = 200 try: top_limit = int(request.GET.get("limit", default_limit)) except (TypeError, ValueError): top_limit = default_limit if top_limit <= 0: top_limit = default_limit user_list = list(users_with_post_stats[:top_limit]) # filters def link_for_modes(modes_list: list[str]) -> str: query: dict[str, str] = {} if modes_list: query["mode"] = ",".join(modes_list) from urllib.parse import urlencode as _urlencode # local alias return f"?{_urlencode(query)}" if query else "?" filters: list[dict[str, str | bool]] = [] # byposts filters.append( { "key": "byposts", "active": ("byposts" in current_modes) or (not current_modes) or not ("bypublished" in current_modes or "bydrafts" in current_modes), "url": link_for_modes( [ m for m in current_modes if m not in ["byposts", "bypublished", "bydrafts"] ] + ["byposts"] ), } ) # bypublished filters.append( { "key": "bypublished", "active": "bypublished" in current_modes, "url": link_for_modes( [ m for m in current_modes if m not in ["byposts", "bypublished", "bydrafts"] ] + ["bypublished"] ), } ) # bydrafts filters.append( { "key": "bydrafts", "active": "bydrafts" in current_modes, "url": link_for_modes( [ m for m in current_modes if m not in ["byposts", "bypublished", "bydrafts"] ] + ["bydrafts"] ), } ) # reverse filters.append( { "key": "reverse", "active": "reverse" in current_modes, "url": link_for_modes( [m for m in current_modes if m != "reverse"] if "reverse" in current_modes else current_modes + ["reverse"] ), } ) clear_filters_url = link_for_modes([]) return render( request, "main/moderation_posts.html", { "user_list": user_list, "top_limit": top_limit, "filters": filters, "clear_filters_url": clear_filters_url, }, ) def stats(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") # Users total_users = models.User.objects.count() approved_users = models.User.objects.filter(is_approved=True).count() premium_users = models.User.objects.filter(is_premium=True).count() users_with_custom_domain = ( models.User.objects.exclude(custom_domain__isnull=True) .exclude(custom_domain__exact="") .count() ) # Posts total_posts = models.Post.objects.count() published_posts = models.Post.objects.filter(published_at__isnull=False).count() draft_posts = models.Post.objects.filter(published_at__isnull=True).count() latest_post_date = models.Post.objects.aggregate(last=Max("published_at")) # Pages total_pages = models.Page.objects.count() # Images image_stats = models.Image.objects.aggregate( count=Count("id"), total_bytes=Sum(Func(F("data"), function="octet_length")), ) total_images = image_stats["count"] or 0 total_image_megabytes = round((image_stats["total_bytes"] or 0) / (1024 * 1024), 2) # Comments total_comments = models.Comment.objects.count() approved_comments = models.Comment.objects.filter(is_approved=True).count() # Notifications (subscribers) and sends total_subscribers = models.Notification.objects.count() active_subscribers = models.Notification.objects.filter(is_active=True).count() total_sends = models.NotificationRecord.objects.count() # Snapshots total_snapshots = models.Snapshot.objects.count() # Averages avg_posts_per_user = models.User.objects.annotate(c=Count("post")).aggregate( avg=Avg("c") ) avg_posts_per_user_val = round(float(avg_posts_per_user["avg"] or 0), 2) context = { "totals": { "users": total_users, "users_approved": approved_users, "users_premium": premium_users, "users_with_custom_domain": users_with_custom_domain, "posts": total_posts, "posts_published": published_posts, "posts_draft": draft_posts, "pages": total_pages, "images": total_images, "images_mb": total_image_megabytes, "comments": total_comments, "comments_approved": approved_comments, "subscribers": total_subscribers, "subscribers_active": active_subscribers, "notification_sends": total_sends, "snapshots": total_snapshots, }, "averages": { "posts_per_user": avg_posts_per_user_val, }, "latest": { "last_post_date": latest_post_date["last"], }, # leave heavy sections to dedicated pages for performance } return render(request, "main/moderation_stats.html", context) def activity(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") now = timezone.now() d90 = now - timedelta(days=90) # New users/posts per day/week/month new_users_daily_qs = ( models.User.objects.filter(date_joined__gte=d90) .annotate(period=TruncDay("date_joined")) .values("period") .annotate(count=Count("id")) .order_by("period") ) new_posts_daily_qs = ( models.Post.objects.filter(created_at__gte=d90) .annotate(period=TruncDay("created_at")) .values("period") .annotate(count=Count("id")) .order_by("period") ) new_users_weekly_qs = ( models.User.objects.filter(date_joined__gte=now - timedelta(weeks=26)) .annotate(period=TruncWeek("date_joined")) .values("period") .annotate(count=Count("id")) .order_by("period") ) new_posts_weekly_qs = ( models.Post.objects.filter(created_at__gte=now - timedelta(weeks=26)) .annotate(period=TruncWeek("created_at")) .values("period") .annotate(count=Count("id")) .order_by("period") ) # helper to build cumulative series def cumulative_points(qs): points = [] total = 0 for row in qs: total += row["count"] points.append({"period": row["period"], "cumulative": total}) return points cum_users_daily = cumulative_points(new_users_daily_qs) cum_posts_daily = cumulative_points(new_posts_daily_qs) # Prepare SVG bar chart data (similar style to transparency page) def prepare_chart(rows_qs, limit): rows = list(rows_qs) if not rows: return [] # take the last N items (most recent) subset = rows[-limit:] if len(rows) > limit else rows # scale within the subset highest = max((r["count"] for r in subset), default=0) or 1 chart = [] x_offset = 0 # iterate in chronological order (oldest on the left, newest on the right) for r in subset: c = r["count"] or 0 count_percent = 1 if highest and c: count_percent = (c * 100) / highest if count_percent < 1: count_percent = 1 chart.append( { "period": r["period"], "count": c, "x_offset": x_offset, "count_percent": count_percent, "negative_count_percent": 100 - count_percent, } ) x_offset += 20 return chart chart_new_users_daily = prepare_chart(new_users_daily_qs, limit=20) chart_new_posts_daily = prepare_chart(new_posts_daily_qs, limit=20) chart_new_users_weekly = prepare_chart(new_users_weekly_qs, limit=12) chart_new_posts_weekly = prepare_chart(new_posts_weekly_qs, limit=12) # dynamic widths so charts don't leave big empty space at the end chart_new_users_daily_width = max(len(chart_new_users_daily) * 20, 1) chart_new_posts_daily_width = max(len(chart_new_posts_daily) * 20, 1) chart_new_users_weekly_width = max(len(chart_new_users_weekly) * 20, 1) chart_new_posts_weekly_width = max(len(chart_new_posts_weekly) * 20, 1) # Cumulative posts by month from 1 May 2020 (counts based on created_at) start_month = date(2020, 5, 1) # aggregate counts per month for posts created after start_month monthly_counts_qs = ( models.Post.objects.filter(created_at__gte=start_month) .annotate(period=TruncMonth("created_at")) .values("period") .annotate(count=Count("id")) .order_by("period") ) # map first-of-month -> count for that month monthly_counts_map: dict[date, int] = {} for row in monthly_counts_qs: period_dt = row["period"] month_key = date(period_dt.year, period_dt.month, 1) monthly_counts_map[month_key] = row["count"] or 0 # generate continuous month sequence up to current month today = timezone.now().date() end_month = date(today.year, today.month, 1) def _add_one_month(d: date) -> date: if d.month == 12: return date(d.year + 1, 1, 1) return date(d.year, d.month + 1, 1) cumulative_posts_monthly: list[dict[str, object]] = [] running_total = 0 cursor = start_month while cursor <= end_month: running_total += monthly_counts_map.get(cursor, 0) cumulative_posts_monthly.append({"period": cursor, "cumulative": running_total}) cursor = _add_one_month(cursor) # Build chart data for cumulative monthly posts if cumulative_posts_monthly: highest_cum = max(r["cumulative"] for r in cumulative_posts_monthly) or 1 else: highest_cum = 1 chart_cumulative_posts_monthly: list[dict[str, object]] = [] x_offset = 0 for r in cumulative_posts_monthly: v = r["cumulative"] or 0 count_percent = 1 if highest_cum and v: count_percent = (v * 100) / highest_cum if count_percent < 1: count_percent = 1 chart_cumulative_posts_monthly.append( { "period": r["period"], "count": v, "x_offset": x_offset, "count_percent": count_percent, "negative_count_percent": 100 - count_percent, } ) x_offset += 20 chart_cumulative_posts_monthly_width = max( len(chart_cumulative_posts_monthly) * 20, 1 ) context = { "cum_users_daily": cum_users_daily, "cum_posts_daily": cum_posts_daily, "chart_new_users_daily": chart_new_users_daily, "chart_new_posts_daily": chart_new_posts_daily, "chart_new_users_weekly": chart_new_users_weekly, "chart_new_posts_weekly": chart_new_posts_weekly, "chart_new_users_daily_width": chart_new_users_daily_width, "chart_new_posts_daily_width": chart_new_posts_daily_width, "chart_new_users_weekly_width": chart_new_users_weekly_width, "chart_new_posts_weekly_width": chart_new_posts_weekly_width, "cumulative_posts_monthly_from_2020": cumulative_posts_monthly, "chart_cumulative_posts_monthly": chart_cumulative_posts_monthly, "chart_cumulative_posts_monthly_width": chart_cumulative_posts_monthly_width, } return render(request, "main/moderation_activity.html", context) def cohorts(request): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") now = timezone.now() d30 = now - timedelta(days=30) most_published_30 = list( models.User.objects.annotate( cnt=Count( "post", filter=Q( post__published_at__isnull=False, post__published_at__gte=d30.date() ), ) ) .filter(cnt__gt=0) .order_by("-cnt", "-id") .values("id", "username", "cnt")[:20] ) largest_blogs_by_bytes = list( models.User.objects.annotate(total_bytes=Sum(Coalesce(Length("post__body"), 0))) .filter(total_bytes__gt=0) .order_by("-total_bytes", "-id") .values("id", "username", "total_bytes")[:20] ) context = { "leaders": { "most_published_30": most_published_30, "largest_blogs_by_bytes": largest_blogs_by_bytes, }, "CANONICAL_HOST": settings.CANONICAL_HOST, } return render(request, "main/moderation_cohorts.html", context) def summary(request, date_str: str): if not request.user.is_authenticated or not request.user.is_superuser: raise Http404() if hasattr(request, "subdomain"): return redirect(f"//{settings.CANONICAL_HOST}{request.path}") try: target_date = datetime.strptime(date_str, "%Y-%m-%d").date() except (TypeError, ValueError) as err: raise Http404() from err prev_date = target_date - timedelta(days=1) next_date = target_date + timedelta(days=1) new_users_qs = models.User.objects.filter(date_joined__date=target_date).order_by( "-id" ) new_posts_qs = ( models.Post.objects.filter(created_at__date=target_date) .select_related("owner") .order_by("-created_at") ) new_pages_qs = ( models.Page.objects.filter(created_at__date=target_date) .select_related("owner") .order_by("-created_at") ) new_comments_qs = ( models.Comment.objects.filter(created_at__date=target_date) .select_related("post", "post__owner") .order_by("-created_at") ) post_visits_count = models.AnalyticPost.objects.filter( created_at__date=target_date ).count() top_posts_by_visits_qs = ( models.Post.objects.filter(analyticpost__created_at__date=target_date) .annotate( visit_count=Count( "analyticpost", filter=Q(analyticpost__created_at__date=target_date) ) ) .select_related("owner") .order_by("-visit_count", "-id") ) context = { "target_date": target_date, "prev_date": prev_date, "next_date": next_date, "counts": { "users": new_users_qs.count(), "posts": new_posts_qs.count(), "pages": new_pages_qs.count(), "comments": new_comments_qs.count(), "post_visits": post_visits_count, }, "new_users": list(new_users_qs), "new_posts": list(new_posts_qs), "new_pages": list(new_pages_qs), "new_comments": list(new_comments_qs), "top_posts_by_visits": list(top_posts_by_visits_qs[:20]), } return render(request, "main/moderation_summary.html", context)