import json import logging from datetime import datetime import stripe from django.conf import settings from django.contrib import messages from django.contrib.auth.decorators import login_required from django.contrib.auth.mixins import LoginRequiredMixin from django.core.mail import mail_admins from django.http import ( HttpResponse, HttpResponseBadRequest, HttpResponseNotAllowed, HttpResponseRedirect, ) from django.shortcuts import redirect, render from django.urls import reverse_lazy from django.views import View from django.views.decorators.csrf import csrf_exempt from django.views.generic.edit import FormView from main import forms, models, util logger = logging.getLogger(__name__) def _create_setup_intent(customer_id): stripe.api_key = settings.STRIPE_API_KEY try: stripe_setup_intent = stripe.SetupIntent.create( automatic_payment_methods={"enabled": True}, customer=customer_id, ) except stripe.error.StripeError as ex: logger.error(str(ex)) raise Exception("Failed to create setup intent on Stripe.") from ex return { "stripe_client_secret": stripe_setup_intent["client_secret"], } def _create_stripe_subscription(customer_id): stripe.api_key = settings.STRIPE_API_KEY # expand subscription's latest invoice and invoice's payment_intent # so we can pass it to the front end to confirm the payment try: stripe_subscription = stripe.Subscription.create( customer=customer_id, items=[ { "price": settings.STRIPE_PRICE_ID, } ], payment_behavior="default_incomplete", payment_settings={"save_default_payment_method": "on_subscription"}, expand=["latest_invoice.payment_intent"], ) except stripe.error.StripeError as ex: logger.error(str(ex)) raise Exception("Failed to create subscription on Stripe.") from ex return { "stripe_subscription_id": stripe_subscription["id"], "stripe_client_secret": stripe_subscription["latest_invoice"]["payment_intent"][ "client_secret" ], } def _get_stripe_subscription(stripe_subscription_id): stripe.api_key = settings.STRIPE_API_KEY try: stripe_subscription = stripe.Subscription.retrieve(stripe_subscription_id) except stripe.error.StripeError as ex: logger.error(str(ex)) raise Exception("Failed to get subscription from Stripe.") from ex return stripe_subscription def _get_payment_methods(stripe_customer_id): """Get user's payment methods and transform them into a dictionary.""" stripe.api_key = settings.STRIPE_API_KEY # get default payment method id try: default_pm_id = stripe.Customer.retrieve( stripe_customer_id ).invoice_settings.default_payment_method except stripe.error.StripeError as ex: logger.error(str(ex)) raise Exception("Failed to retrieve customer data from Stripe.") from ex # get payment methods try: stripe_payment_methods = stripe.PaymentMethod.list( customer=stripe_customer_id, type="card", ) except stripe.error.StripeError as ex: logger.error(str(ex)) raise Exception("Failed to retrieve payment methods from Stripe.") from ex # normalise payment methods payment_methods = {} for stripe_pm in stripe_payment_methods.data: payment_methods[stripe_pm.id] = { "id": stripe_pm.id, "brand": stripe_pm.card.brand, "last4": stripe_pm.card.last4, "exp_month": stripe_pm.card.exp_month, "exp_year": stripe_pm.card.exp_year, "is_default": False, } if stripe_pm.id == default_pm_id: payment_methods[stripe_pm.id]["is_default"] = True return payment_methods def _get_invoices(stripe_customer_id): """Get user's invoices and transform them into a dictionary.""" stripe.api_key = settings.STRIPE_API_KEY # get user invoices try: stripe_invoices = stripe.Invoice.list(customer=stripe_customer_id) except stripe.error.StripeError as ex: logger.error(str(ex)) raise Exception("Failed to retrieve invoices data from Stripe.") from ex # normalise invoices objects invoice_list = [] for stripe_inv in stripe_invoices.data: invoice_list.append( { "id": stripe_inv.id, "url": stripe_inv.hosted_invoice_url, "pdf": stripe_inv.invoice_pdf, "period_start": datetime.fromtimestamp(stripe_inv.period_start), "period_end": datetime.fromtimestamp(stripe_inv.period_end), "created": datetime.fromtimestamp(stripe_inv.created), } ) return invoice_list @login_required def billing_index(request): """ View method that shows the billing index, a summary of subscription and payment methods. """ # respond for grandfathered users first if request.user.is_grandfathered: return render( request, "main/billing_index.html", { "is_grandfathered": True, }, ) # respond for monero case if request.user.monero_address: return render(request, "main/billing_index.html") stripe.api_key = settings.STRIPE_API_KEY # create stripe customer for user if it does not exist if not request.user.stripe_customer_id: try: stripe_response = stripe.Customer.create() except stripe.error.StripeError as ex: logger.error(str(ex)) raise Exception("Failed to create customer on Stripe.") from ex request.user.stripe_customer_id = stripe_response["id"] request.user.save() # get subscription if exists current_period_start = None current_period_end = None if request.user.stripe_subscription_id: subscription = _get_stripe_subscription(request.user.stripe_subscription_id) current_period_start = datetime.utcfromtimestamp( subscription["current_period_start"] ) current_period_end = datetime.utcfromtimestamp( subscription["current_period_end"] ) # transform into list of values payment_methods = _get_payment_methods(request.user.stripe_customer_id).values() return render( request, "main/billing_index.html", { "stripe_customer_id": request.user.stripe_customer_id, "stripe_public_key": settings.STRIPE_PUBLIC_KEY, "stripe_price_id": settings.STRIPE_PRICE_ID, "current_period_end": current_period_end, "current_period_start": current_period_start, "payment_methods": payment_methods, "invoice_list": _get_invoices(request.user.stripe_customer_id), }, ) class BillingSubscribe(LoginRequiredMixin, FormView): form_class = forms.StripeForm template_name = "main/billing_subscribe.html" success_url = reverse_lazy("billing_index") success_message = "premium subscription enabled" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["stripe_public_key"] = settings.STRIPE_PUBLIC_KEY return context def get(self, request, *args, **kwargs): stripe.api_key = settings.STRIPE_API_KEY data = _create_stripe_subscription(request.user.stripe_customer_id) request.user.stripe_subscription_id = data["stripe_subscription_id"] request.user.save() url = f"{util.get_protocol()}//{settings.CANONICAL_HOST}" url += reverse_lazy("billing_welcome") context = self.get_context_data() context["stripe_client_secret"] = data["stripe_client_secret"] context["stripe_return_url"] = url return self.render_to_response(context) def post(self, request, *args, **kwargs): form_class = self.get_form_class() form = self.get_form(form_class) if form.is_valid(): messages.success(request, self.success_message) return self.form_valid(form) else: return self.form_invalid(form) class BillingCard(LoginRequiredMixin, FormView): form_class = forms.StripeForm template_name = "main/billing_card.html" success_url = reverse_lazy("billing_index") success_message = "new card added" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["stripe_public_key"] = settings.STRIPE_PUBLIC_KEY return context def get(self, request, *args, **kwargs): stripe.api_key = settings.STRIPE_API_KEY context = self.get_context_data() data = _create_setup_intent(request.user.stripe_customer_id) context["stripe_client_secret"] = data["stripe_client_secret"] url = f"{util.get_protocol()}//{settings.CANONICAL_HOST}" url += reverse_lazy("billing_card_confirm") context["stripe_return_url"] = url return self.render_to_response(context) def post(self, request, *args, **kwargs): form_class = self.get_form_class() form = self.get_form(form_class) if form.is_valid(): messages.success(request, self.success_message) return self.form_valid(form) else: return self.form_invalid(form) class BillingCardDelete(LoginRequiredMixin, View): """View that deletes a card from a user on Stripe.""" template_name = "main/billing_card_confirm_delete.html" success_url = reverse_lazy("billing_index") success_message = "card deleted" slug_url_kwarg = "stripe_payment_method_id" # dict of valid payment methods with id as key and an obj as val stripe_payment_methods = {} def get_context_data(self, **kwargs): card_id = self.kwargs.get(self.slug_url_kwarg) context = { "card": self.stripe_payment_methods[card_id], } return context def post(self, request, *args, **kwargs): card_id = self.kwargs.get(self.slug_url_kwarg) try: stripe.PaymentMethod.detach(card_id) except stripe.error.StripeError as ex: logger.error(str(ex)) messages.error(request, "payment processor unresponsive; please try again") return redirect(reverse_lazy("billing_index")) messages.success(request, self.success_message) return HttpResponseRedirect(self.success_url) def get(self, request, *args, **kwargs): context = self.get_context_data(**kwargs) return render(request, self.template_name, context) def dispatch(self, request, *args, **kwargs): if not request.user.stripe_customer_id: mail_admins( "User tried to delete card without stripe_customer_id", f"user.id={request.user.id}\nuser.username={request.user.username}", ) messages.error( request, "something has gone terribly wrong but we were just notified about it", ) return redirect("dashboard") self.stripe_payment_methods = _get_payment_methods( request.user.stripe_customer_id ) # check if card id is valid for user card_id = self.kwargs.get(self.slug_url_kwarg) if card_id not in self.stripe_payment_methods: mail_admins( "User tried to delete card with invalid Stripe card ID", f"user.id={request.user.id}\nuser.username={request.user.username}", ) messages.error( request, "this is not a valid card ID", ) return redirect("dashboard") return super().dispatch(request, *args, **kwargs) @login_required def billing_card_default(request, stripe_payment_method_id): """View method that changes the default card of a user on Stripe.""" if request.method != "POST": return HttpResponseNotAllowed(["POST"]) stripe_payment_methods = _get_payment_methods(request.user.stripe_customer_id) if stripe_payment_method_id not in stripe_payment_methods: return HttpResponseBadRequest("Invalid Card ID.") stripe.api_key = settings.STRIPE_API_KEY try: stripe.Customer.modify( request.user.stripe_customer_id, invoice_settings={ "default_payment_method": stripe_payment_method_id, }, ) except stripe.error.StripeError as ex: logger.error(str(ex)) return HttpResponse("Could not change default card.", status=503) messages.success(request, "default card updated") return redirect("billing_index") class BillingCancel(LoginRequiredMixin, View): """View that cancels a user subscription on Stripe.""" template_name = "main/billing_subscription_cancel.html" success_url = reverse_lazy("billing_index") success_message = "premium subscription canceled" def post(self, request, *args, **kwargs): subscription = _get_stripe_subscription(request.user.stripe_subscription_id) try: stripe.Subscription.delete(subscription["id"]) except stripe.error.StripeError as ex: logger.error(str(ex)) return HttpResponse("Subscription could not be canceled.", status=503) request.user.is_premium = False request.user.stripe_subscription_id = None request.user.save() mail_admins( f"Cancellation premium subscriber: {request.user.username}", f"{request.user.blog_absolute_url}\n", ) messages.success(request, self.success_message) return HttpResponseRedirect(self.success_url) def get(self, request, *args, **kwargs): return render(request, self.template_name) def dispatch(self, request, *args, **kwargs): # redirect grandfathered users to dashboard if request.user.is_grandfathered: return redirect("dashboard") # if user has no customer id, redirect to billing_index to have it generated if not request.user.stripe_customer_id: return redirect("billing_index") # if user is not premium, redirect if not request.user.is_premium: return redirect("billing_index") subscription = _get_stripe_subscription(request.user.stripe_subscription_id) if not subscription: return redirect("billing_index") return super().dispatch(request, *args, **kwargs) @login_required def billing_subscription(request): """ View that creates a new subscription for user on Stripe, given they already have a card registered. """ if request.method != "POST": return HttpResponseNotAllowed(["POST"]) # redirect grandfathered users to dashboard if request.user.is_grandfathered: return redirect("dashboard") data = _create_stripe_subscription(request.user.stripe_customer_id) request.user.stripe_subscription_id = data["stripe_subscription_id"] request.user.is_premium = True request.user.is_approved = True request.user.save() mail_admins( f"New premium subscriber: {request.user.username}", f"{request.user.blog_absolute_url}\n\n{request.user.blog_url}", ) messages.success(request, "premium subscription enabled") return redirect("billing_index") @login_required def billing_welcome(request): """ View that Stripe returns to if subscription initialisation is successful. Adds a message alert and redirects to billing_index. """ payment_intent = request.GET.get("payment_intent") stripe.api_key = settings.STRIPE_API_KEY stripe_intent = stripe.PaymentIntent.retrieve(payment_intent) if stripe_intent["status"] == "succeeded": request.user.is_premium = True request.user.is_approved = True request.user.save() mail_admins( f"New premium subscriber: {request.user.username}", f"{request.user.blog_absolute_url}\n\n{request.user.blog_url}", ) messages.success(request, "premium subscription enabled") elif stripe_intent["status"] == "processing": messages.info(request, "payment is currently processing") else: messages.error( request, "something is wrong. don't sweat it, worst case you get premium for free", ) return redirect("billing_index") @login_required def billing_card_confirm(request): setup_intent = request.GET.get("setup_intent") stripe.api_key = settings.STRIPE_API_KEY stripe_intent = stripe.SetupIntent.retrieve(setup_intent) if stripe_intent["status"] == "succeeded": messages.success(request, "payment method added") elif stripe_intent["status"] == "processing": messages.info(request, "payment method addition processing") elif stripe_intent["status"] == "requires_payment_method": messages.info(request, "error setting up payment method :(") else: messages.error( request, "something is wrong. don't sweat it, worst case you get premium for free", ) return redirect("billing_index") @csrf_exempt def billing_stripe_webhook(request): """ Handle Stripe webhooks. See: https://stripe.com/docs/webhooks """ stripe.api_key = settings.STRIPE_API_KEY data = json.loads(request.body) try: event = stripe.Event.construct_from(data, stripe.api_key) except ValueError as ex: logger.error(str(ex)) return HttpResponse(status=400) if event.type == "payment_intent.created": payment_intent = event.data.object if payment_intent.customer is None: return HttpResponse(status=400) user = models.User.objects.filter( stripe_customer_id=payment_intent.customer.id ).is_premium = True user.save() return HttpResponse()