Introduction

Sometimes, when building a Django application, we come across interesting data we want to use in our project. However, this data is not always available in a nice format such as an API or SQL file. If this data currently resides in a bunch of csv files, some of which can be over 100,000 lines long, our task is to create functionality in the Django admin that allows us to upload csv files to the database.

The project we are building needs to cater for the following:

  • Large files shouldn't take a ridiculous amount of time to upload
  • The function should handle integrity errors without crashing our application
  • The admin page should have a file upload form
  • The upload form should work on all models to ensure our application remains DRY (Dont Repeat Yourself)

Extending the Admin View

To begin, let's create the form that will be responsible for uploading our files:

class CsvUploadForm(forms.Form):
    csv_file = forms.FileField()

This form is rather simple, so I have included it in my applications admin.py file. However, feel free to create it in a forms.py and import it later.

Next, let's create the template. We need to extend the inbuilt change_list template. Calling block.super ensures existing functionality remains.

{% extends 'admin/change_list.html' %}
{% block content %}
    {{ block.super }}
    <form action="upload-csv/" method="post" enctype="multipart/form-data">
        {% csrf_token %}
        {{ csv_upload_form }}
        <input type="submit" value="Upload">
    </form>
{% endblock %}

Next, we need to extend the ModelAdmin class to include our new form. In admin.py, below where we defined our CsvUploadForm, create the following class:

class CsvUploadAdmin(admin.ModelAdmin):

    change_list_template = "custom_admin/csv_form.html"

    def get_urls(self):
        urls = super().get_urls()
        additional_urls = [
            path("upload-csv/", self.upload_csv),
        ]
        return additional_urls + urls

    def changelist_view(self, request, extra_context=None):
        extra = extra_context or {}
        extra["csv_upload_form"] = CsvUploadForm()
        return super(CsvUploadAdmin, self).changelist_view(request, extra_context=extra)

    def upload_csv(self, request):
        if request.method == "POST":

            # Here we will process the csv file  

        return redirect("..")

At this stage we need to define our new template, so the form will appear under our list of models.

The method get_urls() first calls super, then adds a new url to the list. This url will handle our form.

We then need to add some extra context to our admin view. In the method changelist_view(), we add our form class to the context. Finally, we create our method to handle the file. This method will be called when a post request is sent to the url upload-csv/

To use the new form in our admin class, we simply inherit from our new base class:

@admin.register(SomeModel)
class SomeModelAdmin(CsvUploadAdmin):
    pass

 

Saving the CSV to the database

Now let's add some functionality to our new form. Create a new class to handle the csv file and save it to the database. Then create an instance of the class and call its create_records() method. But before we do this let's do some basic validation on the file:

def upload_csv(self, request):
    if request.method == "POST":
        form = CsvUploadForm(request.POST, request.FILES)
        if form.is_valid():
            if request.FILES['csv_file'].name.endswith('csv'):

                try:
                    decoded_file = request.FILES['csv_file'].read().decode('utf-8')
                except UnicodeDecodeError as e:
                    self.message_user(
                        request,
                        "There was an error decoding the file:{}".format(e),
                        level=messages.ERROR
                    )
                    return redirect("..")

				# Here we will call our class method
				io_string = io.StringIO(decoded_file)
                uploader = CsvUploader(io_string, self.model)
				result = uploader.create_records()

           else:
               self.message_user(
               request,
               "Incorrect file type: {}".format(
                   request.FILES['csv_file'].name.split(".")[1]
                   ),
               level=messages.ERROR
               )

    else:
        self.message_user(
            request,
            "There was an error in the form {}".format(form.errors),
            level=messages.ERROR
        )

return redirect("..")

You may wish to go into more detail with the validation. As I am only using this to seed the database, the above was enough.

Now let's create the class that will handle saving our data. Create a new file called csv_uploader.py. In this file we first need to import the Python CSV module and some other modules from Django. We will be using these later.

import csv

from django.db.utils import IntegrityError
from django.core.exceptions import FieldDoesNotExist
from django.db import transaction

class CsvUploader:

    def __init__(self, csv_file, model_name, app_name):
        self.reader = list(csv.DictReader(csv_file, delimiter=','))
        self.keys = [k for k in self.reader[0]]
        self.model_fields = [f.name for f in  self.model._meta.get_fields()]
        self.valid = self._validate_csv()
        self.csv_pos = 0

    def _validate_csv(self):

        keys = []
        for k in self.keys:
            if k.endswith("_id"):
                keys.append(k[:-3])
            else:
                keys.append(k)
        return set(keys).issubset(self.model_fields)

In the __init__() method I pass the csv_file and the model we will be saving to. I then set some properties which will be helpful later on. The validate_csv() method checks that our csv column headings match our database structure. The csv_pos attribute keeps a record of how far through the CSV file we have read.

Now let's save it to our database! We will use Django's bulk create method to reduce the number of database queries. We don’t want our bulk create to be too big, as there are limits to the size of database queries. Let's break our data up into manageable chunks. The following method should do the trick:

def read_chunk(self):
    chunk = []
    for i in range(1000):
        try:
            chunk.append(self.model(**self.reader[self.csv_pos]))
        except IndexError as e:
            print(e)
            break
        self.csv_pos += 1
    return chunk

The final piece of the puzzle is to execute our queries. We first attempt to bulk_create each chunk of 1000. If that fails, we fall back to saving them one by one:

def create_records(self):

    if not self.valid:
        return "Invalid csv file"

    while True:
        chunk = self.read_chunk()

        if not chunk:
            break

        try:
            with transaction.atomic():
                self.model.objects.bulk_create(chunk)
        except IntegrityError as e:
            for i in chunk:
                try:
                    i.save()
                except IntegrityError:
                    continue
            print("Exeption: {}".format(e))

      return "records succesfully saved!"

If the bulk_create function raises an error, such as a duplicate primary key, we roll Django back. Then continue with executing our queries one by one. To do that we need to use transaction.atomic().