In my previous post, Creating an API with python: Part 4: DELETE Endpoints, I added three DELETE endpoints to the FastAPI API. In this post, I’ll add oauth2 authentication with JWT tokens.
Prerequisites
- Creating an API with python: Part 1: GET Endpoints
- Creating an API with python: Part 2: MariaDB Database
- Creating an API with python: Part 3: POST Endpoints
- Creating an API with python: Part 4: DELETE Endpoints
Step 1: Update the Python Venv
Add the packages python-jose
, passlib
and python-multipart
to your FastAPI venv.
-
Change to the code directory (
~/vboxshare/fastapi
should be replaced with the path to your FastAPI python code):$ cd ~/vboxshare/fastapi
- Activate the FastAPI venv:
$ . ~/.venv-fastapi/bin/activate
-
Install the new packages (note that we need the
cryptography
andbcrypt
extra backends):(.venv-fastapi) $ pip install python-multipart (.venv-fastapi) $ pip install "python-jose[cryptography]" (.venv-fastapi) $ pip install "passlib[bcrypt]"
Step 2: Add User Database Table
- Connect to MariaDB from the command line, using the root password you set up in Part 2:
mysql -u root -p
- You should now see the MariaDB prompt. Switch to the
apiservice
database:MariaDB [(none)]> USE apiservice;
- Create the
user
table:MariaDB [(apiservice)]> CREATE TABLE IF NOT EXISTS user (user_id CHAR(36) NOT NULL DEFAULT UUID(), username VARCHAR(255), hashed_password VARCHAR(255), PRIMARY KEY (user_id), INDEX (username));
View its details with:MariaDB [(apiservice)]> DESCRIBE user;
Step 3: Get a hashed password
Create a hash_password.py
script which can be used to encrypt the user password before adding it to the user
database table.
-
Change to the code directory (
~/vboxshare/fastapi
should be replaced with the path to your FastAPI python code):$ cd ~/vboxshare/fastapi
- Create a new file called
hash_password.py
and add the following code to it:#!/home/osboxes/.venv-fastapi/bin/python import os from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") password = os.environ.get('PASSWORD') def get_password_hash(password): return pwd_context.hash(password) if __name__ == "__main__": hashed_password = get_password_hash(password) print(hashed_password)
-
On the command line, set your chosen user password as an environment variable. Replace
YOUR_PASSWORD
with your chosen password. Be sure to include the single quotes around the password and make sure your password doesn’t contain any single quotes.$ export PASSWORD='YOUR_PASSWORD'
-
Get your hashed password by running:
$ . ~/.venv-fastapi/bin/activate # If not already activated (.venv-fastapi) $ python hash_password.py
- Make a note of the hashed password. You will need it to create a user in the next step.
Step 4: Add a User
- Connect to MariaDB from the command line, using the root password you set up in Part 2:
mysql -u root -p
- You should now see the MariaDB prompt. Switch to the
apiservice
database:MariaDB [(none)]> USE apiservice;
- Add a new user with the command below. Be sure to replace
HASHED_PASSWORD
with the hashed password you generated in the last step:MariaDB [(apiservice)]> INSERT INTO user SET username='apiuser', hashed_password='HASHED_PASSWORD';
View the newly created user with:MariaDB [(apiservice)]> SELECT * FROM user;
Step 5: Add authentication.py
Create an authentication.py
file which contains all the functionality needed for authenticating a user.
-
Change to the code directory (
~/vboxshare/fastapi
should be replaced with the path to your FastAPI python code):$ cd ~/vboxshare/fastapi
-
Change to the manager directory.
$ cd manager
- Create a new file called
authentication.py
and add the following code to it:from datetime import datetime, timedelta from typing import Union from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from manager import CONFIG, schemas, models from manager.database import get_db AUTH_CONFIG = CONFIG['authentication'] # to get a string like this run: # openssl rand -hex 32 SECRET_KEY = AUTH_CONFIG['secret_key'] ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 30 pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") app = FastAPI() def verify_password(plain_password, hashed_password): return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(db: Session, username: str): return db.query(models.User).filter(models.User.username == username).first() def authenticate_user(db: Session, username: str, password: str): user = get_user(db, username) if not user: return False if not verify_password(password, user.hashed_password): return False return user def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(db: Session = Depends(get_db), token: str = Depends(oauth2_scheme)): credentials_exception = HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) username: str = payload.get("sub") if username is None: raise credentials_exception token_data = schemas.TokenData(username=username) except JWTError: raise credentials_exception user = get_user(db, token_data.username) if user is None: raise credentials_exception return user async def get_current_active_user(current_user: schemas.User = Depends(get_current_user)): return current_user
Step 6: Update config
In this step, we will generate a secret key. Then we will update the config.yaml
file with a new authentication section that contains the secret key. Finally, we’ll move the config loading code to the manager/__init__.py
file, in order that it can be retrieved from multiple modules.
-
Change to the code directory (
~/vboxshare/fastapi
should be replaced with the path to your FastAPI python code):$ cd ~/vboxshare/fastapi
-
On the command-line, run:
$ openssl rand -hex 32
You should see a long string of numbers and letters. Copy this value. -
Open the
config.yaml
file and add an authentication section under the database section. The whole file should look something like the below, withPASSWORD
replaced with your database password in quotes, andSECRET_KEY
replaced with the string you just generated:# database config database: username: apiservice_user password: 'PASSWORD' host: 127.0.0.1 name: apiservice port: 3306 authentication: # to get a string like this run: # openssl rand -hex 32 secret_key: SECRET_KEY
-
Change to the manager directory.
$ cd manager
- Open the
__init__.py
file and add the following code to it:import yaml with open("config.yaml") as file_handle: CONFIG = yaml.safe_load(file_handle)
Step 7: Update database.py
Update database.py
with an import for CONFIG
and add the get_db
function.
- Replace the code in
database.py
with the following code:from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from manager import CONFIG database_config = CONFIG['database'] username = database_config['username'] password = database_config['password'] host = database_config['host'] name = database_config['name'] port = database_config['port'] SQLALCHEMY_DATABASE_URL = f"mariadb+mariadbconnector://{username}:{password}@{host}:{port}/{name}" engine = create_engine(SQLALCHEMY_DATABASE_URL) SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() # Dependency def get_db(): db = SessionLocal() try: yield db except Exception: db.rollback() finally: db.close()
Step 8: Update schemas.py
Add new schemas to schemas.py
for the new user and token objects.
- Append the code in
schemas.py
with the following:class Token(BaseModel): access_token: str token_type: str expires: str class TokenData(BaseModel): username: Union[str, None] = None class User(BaseModel): user_id: str username: str class Config: orm_mode = True
Step 9: Update models.py
Update models.py
with a new class, User
, for the user
database table.
- Append the code in
models.py
with the following:class User(Base): __tablename__ = "user" user_id = Column(String, primary_key=True, index=True) username = Column(String) hashed_password = Column(String)
Step 10: Update main.py
Update main.py
with new imports, and a new endpoint (token
) for requesting a JWT access token.
- Change directory to the top level directory:
$ cd ../
- Update the import section of
main.py
with the following code. Note that theget_db
function should be removed, as it has moved todatabase.py
:from typing import Optional from datetime import timedelta, datetime from sqlalchemy.orm import Session from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from manager import manager, schemas, authentication from manager.database import get_db app = FastAPI()
-
Add the
token
endpoint with the following code appended to the file:@app.post("/token/", response_model=schemas.Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): user = authentication.authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=authentication.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = authentication.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) expires = datetime.utcnow() + access_token_expires return {"access_token": access_token, "token_type": "bearer", "expires": expires.isoformat()}
Step 11: Start FastAPI
Start the FastAPI server with:
$ cd ~/vboxshare/fastapi # Change to the name of the directory where fastapi is installed. $ uvicorn --host 0.0.0.0 main:app --reload
Note that in production environments, you would be strongly advised to run the API over https, as the token
endpoint accepts passwords, which should be encrypted before being sent over a connection.
Step 12: Authenticate via the Swagger URL
Try viewing the API via the Swagger URL at http://YOUR_IP:8000/docs (replace YOUR_IP
with the IP address of the server running the FastAPI API). You should see a new ‘Authorize’ button at the top. If you click it, you should be able to enter your username and password and obtain a JWT token that gets saved to your browser. If you then try making a request via any endpoint, your request will be authenticated by that JWT token.
Step 13: Update Test Script
Now we need to update the test script to test getting a JWT access token and then calling the other endpoints with JWT token authentication.
- Open the file
test.sh
, and replace the code with the following:IP=$1 BASEURL=http://$IP:8000 USERNAME=apiuser echo "==========================" echo "Test GET /tag tag=test1 UNAUTHORIZED. Expect 401 response: Not authenticated" resp=$(curl -X "GET" -H "Content-Type: application/json" $BASEURL/tag/?tag=test1) echo $resp echo "==========================" echo "Test POST /token username=$USERNAME, password=invalid. Expect 401 response: 'Incorrect username or password'" resp=$(curl -X "POST" -H "accept: application/json" -H "Content-Type: application/x-www-form-urlencoded" -d "grant_type=&username=$USERNAME&password=blah&scope=&client_id=&client_secret=" $BASEURL/token/) echo $resp echo "==========================" echo "Test POST /token username=$USERNAME, password=$PASSWORD" resp=$(curl -X "POST" -H "accept: application/json" -H "Content-Type: application/x-www-form-urlencoded" -d "grant_type=&username=$USERNAME&password=$PASSWORD&scope=&client_id=&client_secret=" $BASEURL/token/) echo $resp token=$(echo $resp | jq -r '.access_token') echo $token echo "==========================" echo "Test POST /link link=https://www.test1.com, tag=test1" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test1.com\", \"tag\": \"test1\"}" $BASEURL/link/) echo $resp link_id1=$(echo $resp | jq -r '.link_id') echo $link_id1 echo "==========================" echo "Test GET /tag tag=test1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/?tag=test1) echo $resp tag_id1=$(echo $resp | jq -r '.[0].tag_id') echo $tag_id1 echo "==========================" echo "Test POST /tag tag=test2" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"tag\": \"test2\"}" $BASEURL/tag/) echo $resp tag_id2=$(echo $resp | jq -r '.tag_id') echo $tag_id2 echo "==========================" echo "Test POST /tag tag=test3" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"tag\": \"test3\"}" $BASEURL/tag/) echo $resp tag_id3=$(echo $resp | jq -r '.tag_id') echo $tag_id3 echo "==========================" echo "Test POST /link link=https://www.test2.com, tag=test2" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test2.com\", \"tag\": \"test2\"}" $BASEURL/link/) echo $resp link_id2=$(echo $resp | jq -r '.link_id') echo $link_id2 echo "==========================" echo "Test POST /link link=https://www.test2.com, tag_id=$tag_id1" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test2.com\", \"tag_id\": \"$tag_id1\"}" $BASEURL/link/) echo $resp link_id2=$(echo $resp | jq -r '.link_id') echo $link_id2 echo "==========================" echo "Test POST /link link=https://www.test3.com, tag_id=$tag_id3" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test3.com\", \"tag_id\": \"$tag_id3\"}" $BASEURL/link/) echo $resp link_id3=$(echo $resp | jq -r '.link_id') echo $link_id3 echo "==========================" echo "Test POST /link link=https://www.test2.com, tag_id=invalid. Expect 404 response: Tag with tag_id invalid not found" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test2.com\", \"tag_id\": \"invalid\"}" $BASEURL/link/) echo $resp echo "==========================" echo "Test GET /link" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/) echo $resp echo "==========================" echo "Test GET /link tag_id=$tag_id1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/?tag_id=$tag_id1) echo $resp echo "==========================" echo "Test GET /link tag=test1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/?tag=test1) echo $resp echo "==========================" echo "Test GET /link link_id=$link_id1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id1) echo $resp echo "==========================" echo "Test GET /tag tag_id=$tag_id1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id1) echo $resp echo "==========================" echo "Test GET /tag" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id2, link_id=$link_id1" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id1\", \"tag_id\": \"$tag_id2\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id2, link_id=$link_id3" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id3\", \"tag_id\": \"$tag_id2\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=invalid, link_id=$link_id1. Expect 422 response: Tag with tag_id invalid not found" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id1\", \"tag_id\": \"invalid\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id1, link_id=invalid. Expect 422 response: Link with link_id invalid not found" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"invalid\", \"tag_id\": \"$tag_id1\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id1, link_id=$link_id1. Expect 409 response: TagLink with tag_id $tag_id1 and link_id $link_id1 exists" resp=$(curl -X "POST" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id1\", \"tag_id\": \"$tag_id1\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test GET /taglink" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test GET /taglink link_id=$link_id1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?link_id=$link_id1) echo $resp echo "==========================" echo "Test GET /taglink tag_id=$tag_id1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?tag_id=$tag_id1) echo $resp echo "==========================" echo "Test GET /taglink tag_id=$tag_id1 link_id=$link_id1" resp=$(curl -X "GET" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?tag_id=$tag_id1&link_id=$link_id1) echo $resp echo "==========================" echo "Test DELETE /taglink. Expect 422 response: One or both of tag_id or link_id must be specified" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test DELETE /taglink tag_id=$tag_id3" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?tag_id=$tag_id3) echo $resp echo "==========================" echo "Test DELETE /taglink link_id=$link_id3" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?link_id=$link_id3) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=$tag_id1" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id1) echo $resp echo "==========================" echo "Test DELETE /link link_id=$link_id1" resp=$(curl -X "DELETE" -H -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id1) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=$tag_id2" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id2) echo $resp echo "==========================" echo "Test DELETE /link link_id=$link_id2" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id2) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=$tag_id3" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id3) echo $resp echo "==========================" echo "Test DELETE /link link_id=$link_id3" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id3) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=blah. Expect 404 response: Tag with tag_id blah not found" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/blah) echo $resp echo "==========================" echo "Test DELETE /link link_id=blah. Expect 404 response: Link with link_id blah not found" resp=$(curl -X "DELETE" -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/blah) echo $resp
- Make sure the test script is executable:
$ chmod ugo+x test.sh
Step 14: Run the test script
-
Set your password as an environment variable, if you haven’t already. Replace
YOUR_PASSWORD
with the password you created in Step 3. Be sure to keep the single quotes around the password:$ export PASSWORD='YOUR_PASSWORD'
- Run the test script, replacing
YOUR_IP
with the IP of your API host server (or127.0.0.1
if running the test script from the machine that is hosting the API):$ ./test.sh YOUR_IP
If all goes well, you should see an output of json responses to each request, with no errors, other than those expected in the test descriptions.
Conclusion
You should now have a FastAPI API running with five GET endpoints, three POST endpoints and three DELETE endpoints that are authenticated by an oauth2 JWT token.
To add HTTPS and a proxy URL, see my follow-on post, Creating an API with python: Part 6: HTTPS and Proxying.
Thanks for reading!
Recent Comments