In my previous post, Creating an API with python: Part 7: CORS, I added CORS (Cross-Origin Resource Scripting) support to the FastAPI API in order that it can be called from a browser where the website host is not the same as that of the API. In this post, I will add support for multiple accounts, with each account owning its own set of tags and links.
Prerequisites
These prerequisites are assumed for this post:
- Creating an API with python: Part 1: GET Endpoints
- Creating an API with python: Part 2: MariaDB Database
- Creating an API with python: Part 3: POST Endpoints
- Creating an API with python: Part 4: DELETE Endpoints
- Creating an API with python: Part 5: Authentication
- Creating an API with python: Part 6: HTTPS and Proxying
- Creating an API with python: Part 7: CORS
Step 1: Update/Add Database Tables
Add the account
database table and update the link
, tag
and taglink
tables to have an account_id
column.
-
From the API server, connect to the database:
$ mysql -u root -p
- Switch to the
apiservice
database:MariaDB [(none)]> use apiservice;
-
Create the
account
table:MariaDB [apiservice]> CREATE TABLE IF NOT EXISTS account (account_id CHAR(36) NOT NULL, email VARCHAR(255), hashed_password VARCHAR(255), created DATETIME DEFAULT UTC_TIMESTAMP(), PRIMARY KEY (account_id), UNIQUE KEY (email));
-
Update the
taglink
table with theaccount_id
column, indices and foreign key constraint (note that this will remove any existing data from the table):MariaDB [apiservice]> TRUNCATE TABLE taglink; MariaDB [apiservice]> ALTER TABLE taglink ADD COLUMN IF NOT EXISTS account_id CHAR(36) NOT NULL AFTER link_id; MariaDB [apiservice]> ALTER TABLE taglink ADD INDEX(account_id, tag_id); MariaDB [apiservice]> ALTER TABLE taglink ADD INDEX(account_id, link_id); MariaDB [apiservice]> ALTER TABLE taglink ADD CONSTRAINT FOREIGN KEY (account_id) REFERENCES account (account_id);
-
Update the
link
table with theaccount_id
column, index and foreign key constraint (note that this will remove any existing data from the table):MariaDB [apiservice]> TRUNCATE TABLE link; MariaDB [apiservice]> ALTER TABLE link ADD COLUMN IF NOT EXISTS account_id CHAR(36) NOT NULL AFTER link_id; MariaDB [apiservice]> ALTER TABLE link ADD INDEX(account_id); MariaDB [apiservice]> ALTER TABLE link ADD CONSTRAINT FOREIGN KEY (account_id) REFERENCES account (account_id);
-
Update the
tag
table with theaccount_id
column, indices and foreign key constraint (note that this will remove any existing data from the table):MariaDB [apiservice]> TRUNCATE TABLE tag; MariaDB [apiservice]> ALTER TABLE tag ADD COLUMN IF NOT EXISTS account_id CHAR(36) NOT NULL AFTER tag_id; MariaDB [apiservice]> ALTER TABLE tag ADD UNIQUE KEY (account_id, tag); MariaDB [apiservice]> ALTER TABLE tag ADD CONSTRAINT FOREIGN KEY (account_id) REFERENCES account (account_id);
Step 2: Update main.py
Update main.py
with new /account
endpoints, and update the existing endpoints to use account_id
.
- Open the
main.py
file and replace the code with the following:from typing import Optional from datetime import timedelta, datetime from sqlalchemy.orm import Session from fastapi import FastAPI, HTTPException, Depends, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from fastapi.middleware.cors import CORSMiddleware from manager import manager, schemas, authentication, CONFIG from manager.database import get_db app = FastAPI() origins = [origin for origin in CONFIG['origins']] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Get link by link_id @app.get("/link/{link_id}") async def get_link(link_id: str, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") db_link = manager.get_link(db, link_id) if db_link is None: raise HTTPException(status_code=404, detail="Link not found") return db_link # Get links by query params @app.get("/link/") async def get_links(tag_id: Optional[str] = None, tag: Optional[str] = None, account_id: Optional[str] = None, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") return manager.get_links(db, tag_id, tag, account_id) # Get tag by tag_id @app.get("/tag/{tag_id}") async def get_tag(tag_id: str, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") db_tag = manager.get_tag(db, tag_id) if db_tag is None: raise HTTPException(status_code=404, detail="Tag not found") return db_tag # Get tags by query params @app.get("/tag/") async def get_tags(tag: Optional[str] = None, account_id: Optional[str] = None, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") return manager.get_tags(db, tag, account_id) # Get taglinks by query params @app.get("/taglink/") async def get_taglinks(link_id: Optional[str] = None, tag_id: Optional[str] = None, account_id: Optional[str] = None, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") return manager.get_taglinks(db, tag_id, link_id, account_id) # Post a link @app.post("/link/") async def post_link(link: schemas.PostLink, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") if link.tag is None and link.tag_id is None: raise HTTPException(status_code=422, detail="One of tag_id or tag must be specified") if link.tag is not None and link.tag_id is not None: raise HTTPException(status_code=422, detail="Only one of tag_id or tag must be specified") db_link = manager.create_link(db, link) return db_link # Post a tag @app.post("/tag/") async def post_tag(tag: schemas.PostTag, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") db_tag = manager.create_tag(db, tag) return db_tag # Post a taglink @app.post("/taglink/") async def post_taglink(taglink: schemas.PostTagLink, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") db_tag = manager.create_taglink(db, taglink) return db_tag # Delete link by link_id @app.delete("/link/{link_id}") async def delete_link(link_id: str, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") return manager.delete_link(db, link_id) # Delete tag by tag_id @app.delete("/tag/{tag_id}") async def delete_tag(tag_id: str, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") return manager.delete_tag(db, tag_id) # Delete taglinks by query params @app.delete("/taglink/") async def delete_taglinks(link_id: Optional[str] = None, tag_id: Optional[str] = None, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") if link_id is None and tag_id is None: raise HTTPException(status_code=422, detail="One or both of tag_id and link_id must be specified") return manager.delete_taglinks(db, tag_id, link_id) @app.post("/token/", response_model=schemas.Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)): user = authentication.authenticate_user(db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"}, ) access_token_expires = timedelta(minutes=authentication.ACCESS_TOKEN_EXPIRE_MINUTES) access_token = authentication.create_access_token( data={"sub": user.username}, expires_delta=access_token_expires ) expires = datetime.utcnow() + access_token_expires return {"access_token": access_token, "token_type": "bearer", "expires": expires.isoformat()} # Create a new account @app.post("/account/") async def post_account(account: schemas.PostAccount, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") db_account = manager.create_account(db, account) return db_account # Get accounts by query params @app.get("/account/") async def get_accounts(email: Optional[str] = None, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") return manager.get_accounts(db, email) # Get account by account_id @app.get("/account/{account_id}") async def get_account(account_id: str, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") db_account = manager.get_account(db, account_id) if db_account is None: raise HTTPException(status_code=404, detail="Account not found") return db_account # Delete account by account_id @app.delete("/account/{account_id}") async def delete_account(account_id: str, db: Session = Depends(get_db), current_user: schemas.User = Depends(authentication.get_current_active_user)): print(f"authenticated as {current_user.username}") return manager.delete_account(db, account_id)
Step 3: Update schemas.py
Update schemas.py
with a new schema class PostAccount
, and add account_id
fields to the other classes.
- Change directory to the manager directory:
$ cd manager
- Open the
schemas.py
file and replace the code with the following:from typing import Optional, Union from pydantic import BaseModel, Field class PostLink(BaseModel): link: str = Field(..., description="The link URL") tag: Optional[str] = Field(None, description="Tag name to associate with the link (will be created if it doesn't exist)") tag_id: Optional[str] = Field(None, description="Tag ID to associate with the link (must already exist)") account_id: str = Field(..., description="The account ID") class PostTag(BaseModel): tag: str = Field(..., description="Tag name") account_id: str = Field(..., description="The account ID") class PostTagLink(BaseModel): tag_id: str = Field(..., description="Tag ID (must already exist)") link_id: str = Field(..., description="Link ID (must already exist)") account_id: str = Field(..., description="The account ID") class PostAccount(BaseModel): email: str = Field(..., description="Email Address for the new account") password: str = Field(..., description="Password for the new account") class Token(BaseModel): access_token: str token_type: str expires: str class TokenData(BaseModel): username: Union[str, None] = None class User(BaseModel): user_id: str username: str class Config: orm_mode = True
Step 4: Update models.py
Update models.py
with a new models class Account
, and add account_id
fields to the other classes.
- Open the
models.py
file and replace the code with the following:from sqlalchemy import Column, ForeignKey, String, UniqueConstraint from manager.database import Base class Link(Base): __tablename__ = "link" link_id = Column(String, primary_key=True, index=True) account_id = Column(String, ForeignKey("account.account_id"), index=True) link = Column(String) class Tag(Base): __tablename__ = "tag" tag_id = Column(String, primary_key=True, index=True) account_id = Column(String, ForeignKey("account.account_id"), index=True) tag = Column(String) class TagLink(Base): __tablename__ = "taglink" tag_id = Column(String, ForeignKey("tag.tag_id"), primary_key=True, index=True) link_id = Column(String, ForeignKey("link.link_id"), primary_key=True, index=True) account_id = Column(String, ForeignKey("account.account_id"), index=True) class User(Base): __tablename__ = "user" user_id = Column(String, primary_key=True, index=True) username = Column(String) hashed_password = Column(String) class Account(Base): __tablename__ = "account" account_id = Column(String, primary_key=True, index=True) email = Column(String) hashed_password = Column(String) created = Column(String)
Step 5: Update manager.py
Update manager.py
with new methods for handling accounts, and add account_id
to the parameters for other methods.
- Open the
manager.py
file and replace the code with the following:from typing import Optional from datetime import datetime from uuid import uuid4 from sqlalchemy.orm import Session from fastapi import HTTPException from manager import models, schemas, authentication def get_link(db: Session, link_id: str, account_id: Optional[str] = None): filters = [models.Link.link_id == link_id] if account_id is not None: filters.append(models.Link.account_id == account_id) return db.query(models.Link).filter(*filters).first() def get_tag(db: Session, tag_id: str, account_id: Optional[str] = None): filters = [models.Tag.tag_id == tag_id] if account_id is not None: filters.append(models.Tag.account_id == account_id) return db.query(models.Tag).filter(*filters).first() def get_tag_by_tag_name(db: Session, tag: str, account_id: Optional[str]): filters = [models.Tag.tag == tag] if account_id is not None: filters.append(models.Tag.account_id == account_id) return db.query(models.Tag).filter(*filters).first() def get_links(db: Session, tag_id: Optional[str] = None, tag: Optional[str] = None, account_id: Optional[str] = None): filters = [] if tag is None and tag_id is None and account_id is None: # TODO: Implement offset and limit return db.query(models.Link).all() if tag is not None: tag_record = get_tag_by_tag_name(db, tag, account_id) if tag_record is None: return [] tag_id = tag_record.tag_id if account_id is not None: filters.append(models.Link.account_id == account_id) if tag_id is not None: filters.append(models.TagLink.tag_id == tag_id) return db.query(models.Link).join(models.TagLink).filter(*filters).all() return db.query(models.Link).filter(*filters).all() def get_tags(db: Session, tag: Optional[str] = None, account_id: Optional[str] = None): filters = [] if tag is None and account_id is None: # TODO: Implement offset and limit return db.query(models.Tag).all() if tag is not None: filters.append(models.Tag.tag == tag) if account_id is not None: filters.append(models.Tag.account_id == account_id) return db.query(models.Tag).filter(*filters).all() def get_taglinks(db: Session, tag_id: Optional[str] = None, link_id: Optional[str] = None, account_id: Optional[str] = None): filters = [] if tag_id is None and link_id is None and account_id is None: # TODO: Implement offset and limit return db.query(models.TagLink).all() if tag_id is not None: filters.append(models.TagLink.tag_id == tag_id) if link_id is not None: filters.append(models.TagLink.link_id == link_id) if account_id is not None: filters.append(models.TagLink.account_id == account_id) return db.query(models.TagLink).filter(*filters).all() def get_accounts(db: Session, email: Optional[str] = None): filters = [] if email is None: # TODO: Implement offset and limit return db.query(models.Account).all() if email is not None: filters.append(models.Account.email == email) return db.query(models.Account).filter(*filters).all() def get_account(db: Session, account_id: str): filters = [models.Account.account_id == account_id] return db.query(models.Account).filter(*filters).first() def create_link(db: Session, link: schemas.PostLink): db_link = models.Link(link_id=str(uuid4()), link=link.link, account_id=link.account_id) db.add(db_link) tag_id = link.tag_id link_id = db_link.link_id if link.tag is not None: db_tag = get_tag_by_tag_name(db, link.tag, link.account_id) if db_tag is None: db_tag = models.Tag(tag_id=str(uuid4()), tag=link.tag, account_id=link.account_id) db.add(db_tag) tag_id = db_tag.tag_id else: tag_id = db_tag.tag_id elif tag_id is not None: db_tag = get_tag(db, tag_id, link.account_id) if db_tag is None: raise HTTPException(status_code=404, detail=f"Tag with tag_id {tag_id} not found for account_id {link.account_id}") db_taglink = models.TagLink(link_id=link_id, tag_id=tag_id, account_id=link.account_id) db.add(db_taglink) db.commit() db.refresh(db_link) return db_link def create_tag(db: Session, tag: schemas.PostTag): db_tag = models.Tag(tag_id=str(uuid4()), tag=tag.tag, account_id=tag.account_id) db_tag_existing = get_tags(db, tag=tag.tag, account_id=tag.account_id) if len(db_tag_existing) > 0: raise HTTPException(status_code=409, detail=f"Tag with name {tag.tag} exists for account {tag.account_id}") db.add(db_tag) db.commit() db.refresh(db_tag) return db_tag def create_taglink(db: Session, taglink: schemas.PostTagLink): tag_id = taglink.tag_id link_id = taglink.link_id account_id = taglink.account_id db_taglink = models.TagLink(tag_id=tag_id, link_id=link_id, account_id=account_id) db_tag_id_existing = get_tag(db, tag_id, account_id) if db_tag_id_existing is None: raise HTTPException(status_code=422, detail=f"Tag with tag_id {tag_id} not found for account_id {account_id}") db_link_id_existing = get_link(db, link_id, account_id) if db_link_id_existing is None: raise HTTPException(status_code=422, detail=f"Link with link_id {link_id} not found for account_id {account_id}") db_taglink_existing = get_taglinks(db, tag_id=tag_id, link_id=link_id, account_id=account_id) if len(db_taglink_existing) > 0: raise HTTPException(status_code=409, detail=f"TagLink with tag_id {tag_id} and link_id {link_id} exists") db.add(db_taglink) db.commit() db.refresh(db_taglink) return db_taglink def delete_link(db: Session, link_id: str, account_id: Optional[str] = None): db_link = get_link(db, link_id=link_id, account_id=account_id) if not db_link: raise HTTPException(status_code=404, detail=f"Link with link_id {link_id} not found") delete_taglinks(db, link_id=link_id) db.delete(db_link) db.commit() return "OK" def delete_tag(db: Session, tag_id: str, account_id: Optional[str] = None): db_tag = get_tag(db, tag_id=tag_id, account_id=account_id) if not db_tag: raise HTTPException(status_code=404, detail=f"Tag with tag_id {tag_id} not found") delete_taglinks(db, tag_id=tag_id) db.delete(db_tag) db.commit() return "OK" def delete_taglinks(db: Session, tag_id: Optional[str] = None, link_id: Optional[str] = None, account_id: Optional[str] = None): db_taglinks = get_taglinks(db, tag_id=tag_id, link_id=link_id, account_id=account_id) for db_taglink in db_taglinks: db.delete(db_taglink) db.commit() return "OK" def get_account_from_email(db: Session, email: str): return db.query(models.Account).filter(models.Account.email == email).first() def create_account(db: Session, account: schemas.PostAccount): hashed_password = authentication.get_password_hash(account.password) now = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S') db_account = models.Account(account_id=str(uuid4()), email=account.email, hashed_password=hashed_password, created=now) db_account_existing = get_account_from_email(db, email=account.email) if db_account_existing is not None: raise HTTPException(status_code=409, detail=f"Account with email {account.email} exists") db.add(db_account) db.commit() db.refresh(db_account) return db_account def delete_account(db: Session, account_id: str): db_account = get_account(db, account_id=account_id) if db_account is None: raise HTTPException(status_code=404, detail=f"Account id {account_id} not found") delete_taglinks(db, account_id=account_id) tags = get_tags(db, account_id=account_id) for db_tag in tags: db.delete(db_tag) db.commit() links = get_links(db, account_id=account_id) for db_link in links: print(db_link) db.delete(db_link) db.commit() db.delete(db_account) db.commit() return "OK"
Step 6: Start FastAPI
- On your server, change to the code directory (
~/vboxshare/fastapi
should be replaced with the path to your FastAPI python code):$ cd ~/vboxshare/fastapi
-
Run the FastAPI server:
$ . ~/.venv-fastapi/bin/activate (.venv-fastapi) $ uvicorn --host 0.0.0.0 main:app --root-path /api --reload
Step 7: Update Test Script
Update the script for testing the API.
-
Open the
test.sh
script and replace the code with the following:IP=$1 BASEURL=https://$IP/api CACERT="/etc/ssl/certs/rootCA.crt" echo "==========================" echo "Test GET /tag tag=test1 UNAUTHORIZED. Expect 401 response: Not authenticated" resp=$(curl -X "GET" --cacert $CACERT -H "Content-Type: application/json" $BASEURL/tag/?tag=test1) echo $resp echo "==========================" echo "Test POST /token username=$USERNAME, password=invalid. Expect 401 response: 'Incorrect username or password'" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Content-Type: application/x-www-form-urlencoded" -d "grant_type=&username=$USERNAME&password=blah&scope=&client_id=&client_secret=" $BASEURL/token/) echo $resp echo "==========================" echo "Test POST /token username=$USERNAME, password=$PASSWORD" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Content-Type: application/x-www-form-urlencoded" -d "grant_type=&username=$USERNAME&password=$PASSWORD&scope=&client_id=&client_secret=" $BASEURL/token/) echo $resp token=$(echo $resp | jq -r '.access_token') echo $token echo "==========================" echo "Test POST /account email=test@test1.com, password=testpass1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"email\": \"test@test1.com\", \"password\": \"testpass1\"}" $BASEURL/account/) echo $resp account_id1=$(echo $resp | jq -r '.account_id') echo $account_id1 echo "==========================" echo "Test POST /account email=test@test2.com, password=testpass2" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"email\": \"test@test2.com\", \"password\": \"testpass2\"}" $BASEURL/account/) echo $resp account_id2=$(echo $resp | jq -r '.account_id') echo $account_id2 echo "==========================" echo "Test GET /account account_id=$account_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/account/$account_id) echo $resp echo "==========================" echo "Test GET /account email=test@test2.com" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/account/?email=test@test2.com) echo $resp echo "==========================" echo "Test POST /link link=https://www.test1.com, tag=test1, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test1.com\", \"tag\": \"test1\", \"account_id\": \"$account_id1\"}" $BASEURL/link/) echo $resp link_id1=$(echo $resp | jq -r '.link_id') echo $link_id1 echo "==========================" echo "Test GET /tag tag=test1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/?tag=test1) echo $resp tag_id1=$(echo $resp | jq -r '.[0].tag_id') echo $tag_id1 echo "==========================" echo "Test GET /tag tag=test1, account_id=$account_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/?tag=test1&account_id=$account_id1) echo $resp tag_id1=$(echo $resp | jq -r '.[0].tag_id') echo $tag_id1 echo "==========================" echo "Test POST /tag tag=test2, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"tag\": \"test2\", \"account_id\": \"$account_id1\"}" $BASEURL/tag/) echo $resp tag_id2=$(echo $resp | jq -r '.tag_id') echo $tag_id2 echo "==========================" echo "Test POST /tag tag=test3, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"tag\": \"test3\", \"account_id\": \"$account_id1\"}" $BASEURL/tag/) echo $resp tag_id3=$(echo $resp | jq -r '.tag_id') echo $tag_id3 echo "==========================" echo "Test POST /tag tag=test4, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"tag\": \"test4\", \"account_id\": \"$account_id1\"}" $BASEURL/tag/) echo $resp tag_id4_1=$(echo $resp | jq -r '.tag_id') echo $tag_id4_1 echo "==========================" echo "Test POST /tag tag=test4, account_id=$account_id2" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"tag\": \"test4\", \"account_id\": \"$account_id2\"}" $BASEURL/tag/) echo $resp tag_id4_2=$(echo $resp | jq -r '.tag_id') echo $tag_id4_2 echo "==========================" echo "Test POST /link link=https://www.test2.com, tag=test2, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test2.com\", \"tag\": \"test2\", \"account_id\": \"$account_id1\"}" $BASEURL/link/) echo $resp link_id2=$(echo $resp | jq -r '.link_id') echo $link_id2 echo "==========================" echo "Test POST /link link=https://www.test2.com, tag_id=$tag_id1, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test2.com\", \"tag_id\": \"$tag_id1\", \"account_id\": \"$account_id1\"}" $BASEURL/link/) echo $resp link_id2=$(echo $resp | jq -r '.link_id') echo $link_id2 echo "==========================" echo "Test POST /link link=https://www.test3.com, tag_id=$tag_id3, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test3.com\", \"tag_id\": \"$tag_id3\", \"account_id\": \"$account_id1\"}" $BASEURL/link/) echo $resp link_id3=$(echo $resp | jq -r '.link_id') echo $link_id3 echo "==========================" echo "Test POST /link link=https://www.test4.com, tag_id=$tag_id4_2, account_id=$account_id2" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test3.com\", \"tag_id\": \"$tag_id4_2\", \"account_id\": \"$account_id2\"}" $BASEURL/link/) echo $resp link_id4=$(echo $resp | jq -r '.link_id') echo $link_id4 echo "==========================" echo "Test POST /link link=https://www.test2.com, tag_id=invalid, account_id=$account_id1. Expect 404 response: Tag with tag_id invalid not found for account_id $account_id" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link\": \"https://www.test2.com\", \"tag_id\": \"invalid\", \"account_id\": \"$account_id1\"}" $BASEURL/link/) echo $resp echo "==========================" echo "Test GET /link" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/) echo $resp echo "==========================" echo "Test GET /link tag_id=$tag_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/?tag_id=$tag_id1) echo $resp echo "==========================" echo "Test GET /link tag=test1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/?tag=test1) echo $resp echo "==========================" echo "Test GET /link account_id=$account_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/?account_id=$account_id1) echo $resp echo "==========================" echo "Test GET /link link_id=$link_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id1) echo $resp echo "==========================" echo "Test GET /tag tag_id=$tag_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id1) echo $resp echo "==========================" echo "Test GET /tag" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id2, link_id=$link_id1, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id1\", \"tag_id\": \"$tag_id2\", \"account_id\": \"$account_id1\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id2, link_id=$link_id3, account_id=$account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id3\", \"tag_id\": \"$tag_id2\", \"account_id\": \"$account_id1\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=invalid, link_id=$link_id1, account_id=$account_id1. Expect 422 response: Tag with tag_id invalid not found for account_id $account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id1\", \"tag_id\": \"invalid\", \"account_id\": \"$account_id1\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id1, link_id=invalid, account_id=$account_id1. Expect 422 response: Link with link_id invalid not found for account_id $account_id1" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"invalid\", \"tag_id\": \"$tag_id1\", \"account_id\": \"$account_id1\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test POST /taglink tag_id=$tag_id1, link_id=$link_id1, account_id=$account_id1. Expect 409 response: TagLink with tag_id $tag_id1 and link_id $link_id1 exists" resp=$(curl -X "POST" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" -d "{\"link_id\": \"$link_id1\", \"tag_id\": \"$tag_id1\", \"account_id\": \"$account_id1\"}" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test GET /taglink" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test GET /taglink link_id=$link_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?link_id=$link_id1) echo $resp echo "==========================" echo "Test GET /taglink tag_id=$tag_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?tag_id=$tag_id1) echo $resp echo "==========================" echo "Test GET /taglink account_id=$account_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?account_id=$account_id1) echo $resp echo "==========================" echo "Test GET /taglink tag_id=$tag_id1 link_id=$link_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?tag_id=$tag_id1&link_id=$link_id1) echo $resp echo "==========================" echo "Test GET /taglink tag_id=$tag_id1 link_id=$link_id1 account_id=$account_id1" resp=$(curl -X "GET" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?tag_id=$tag_id1&link_id=$link_id1&account_id=$account_id1) echo $resp echo "==========================" echo "Test DELETE /taglink. Expect 422 response: One or both of tag_id or link_id must be specified" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/) echo $resp echo "==========================" echo "Test DELETE /taglink tag_id=$tag_id3" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?tag_id=$tag_id3) echo $resp echo "==========================" echo "Test DELETE /taglink link_id=$link_id3" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/taglink/?link_id=$link_id3) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=$tag_id1" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id1) echo $resp echo "==========================" echo "Test DELETE /link link_id=$link_id1" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id1) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=$tag_id2" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id2) echo $resp echo "==========================" echo "Test DELETE /link link_id=$link_id2" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id2) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=$tag_id3" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/$tag_id3) echo $resp echo "==========================" echo "Test DELETE /link link_id=$link_id3" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/$link_id3) echo $resp echo "==========================" echo "Test DELETE /tag tag_id=blah. Expect 404 response: Tag with tag_id blah not found" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/tag/blah) echo $resp echo "==========================" echo "Test DELETE /link link_id=blah. Expect 404 response: Link with link_id blah not found" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/link/blah) echo $resp echo "==========================" echo "Test DELETE /account account_id=blah. Expect 404 response: Account id blah not found" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/account/blah) echo $resp echo "==========================" echo "Test DELETE /account account_id=$account_id1" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/account/$account_id1) echo $resp echo "==========================" echo "Test DELETE /account account_id=$account_id2" resp=$(curl -X "DELETE" --cacert $CACERT -H "accept: application/json" -H "Authorization: Bearer $token" -H "Content-Type: application/json" $BASEURL/account/$account_id2) echo $resp
- Make sure the test script is executable:
$ chmod ugo+x test.sh
Step 8: Run the test script
-
Set your password as an environment variable. Replace
YOUR_PASSWORD
with the password you created in Part 5. Be sure to keep the single quotes around the password:$ export PASSWORD='YOUR_PASSWORD'
- Run the test script, replacing
YOUR_IP
with the IP of your API host server (or127.0.0.1
if running the test script from the machine that is hosting the API). You must make sure that the CACerts you created in Part 6 are available locally on the path/etc/ssl/certs/rootCA.crt
(they should be if running from the API host machine):$ ./test.sh YOUR_IP
If all goes well, you should see an output of json responses to each request, with no errors, other than those expected in the test descriptions.
Conclusion
You should now have a FastAPI API running with multi-account support, which means that tags and links can now belong to separate accounts.
If you want to find out how to add Authentication Scopes to the API, see my follow-on post, Creating an API with python: Part 9: Authentication Scopes.
Thanks for reading!
Recent Comments