Inference
Step 1. Create a directory called inference.
mkdir inference
Step 2. Change your working directory to the directory you just created.
cd inference
Step 3. Create a file called requirements.txt and add the content below to it.
ag-ui-langgraph
azure-identity
copilotkit
fastapi
langchain
langchain-openai
langchain-mcp-adapters
langgraph
pydantic
uvicorn
Step 4. Create a directory called squidfall within the inference directory.
mkdir squidfall
Step 5. In the squidfall directory you just created, create a file called __init__.py.
Step 6. In the squidfall directory, create a file called checkpoint_saver.py .
# Standard library imports.
from os import environ
from typing import Any, AsyncIterator, Optional, Sequence, Tuple
# Third party imports.
from httpx import AsyncClient
from langgraph.checkpoint.base import (
BaseCheckpointSaver,
Checkpoint,
CheckpointMetadata,
CheckpointTuple,
get_checkpoint_id,
)
BACKEND_ENDPOINT = environ["BACKEND_ENDPOINT"]
class DjangoCheckpointSaver(BaseCheckpointSaver):
async def aget_tuple(self, config: dict) -> Optional[CheckpointTuple]:
thread_id = config["configurable"]["thread_id"]
checkpoint_id = get_checkpoint_id(config)
async with AsyncClient() as client:
resp = await client.get(
f"{BACKEND_ENDPOINT}/api/v1/chats/{thread_id}/",
params={"checkpoint_id": checkpoint_id} if checkpoint_id else {},
)
if resp.status_code == 404:
return None
return self._row_to_tuple(config, resp.json())
async def alist(self, config: dict, **kwargs) -> AsyncIterator[CheckpointTuple]:
thread_id = config["configurable"]["thread_id"]
async with AsyncClient() as client:
resp = await client.get(
f"{BACKEND_ENDPOINT}/api/v1/chats/", params={"thread_id": thread_id}
)
for row in resp.json():
yield self._row_to_tuple(config, row)
async def aput(
self,
config: dict,
checkpoint: Checkpoint,
metadata: CheckpointMetadata,
new_versions: dict,
) -> dict:
thread_id = config["configurable"]["thread_id"]
checkpoint_ns = config["configurable"].get("checkpoint_ns", "")
checkpoint_id = checkpoint["id"]
parent_checkpoint_id = get_checkpoint_id(config)
ctype, serialized = self.serde.dumps_typed(checkpoint)
mtype, meta_serialized = self.serde.dumps_typed(metadata)
async with AsyncClient() as client:
await client.post(
f"{BACKEND_ENDPOINT}/api/v1/chats/",
json={
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
"parent_checkpoint_id": parent_checkpoint_id,
"type": ctype,
"checkpoint": serialized.hex(),
"metadata_type": mtype,
"metadata": meta_serialized.hex(),
},
)
return {
"configurable": {
"thread_id": thread_id,
"checkpoint_ns": checkpoint_ns,
"checkpoint_id": checkpoint_id,
}
}
async def aput_writes(
self,
config: dict,
writes: Sequence[Tuple[str, Any]],
task_id: str,
) -> None:
pass
def get_tuple(self, config: dict) -> Optional[CheckpointTuple]:
raise NotImplementedError
def list(self, config: dict, **kwargs):
raise NotImplementedError
def put(self, config, checkpoint, metadata, new_versions):
raise NotImplementedError
def put_writes(self, config, writes, task_id):
raise NotImplementedError
def _row_to_tuple(self, config: dict, row: dict) -> CheckpointTuple:
return CheckpointTuple(
config={
"configurable": {
"thread_id": row["thread_id"],
"checkpoint_ns": row["checkpoint_ns"],
"checkpoint_id": row["checkpoint_id"],
}
},
checkpoint=self.serde.loads_typed(
(row["type"], bytes.fromhex(row["checkpoint"]))
),
metadata=self.serde.loads_typed(
(row["metadata_type"], bytes.fromhex(row["metadata"]))
),
parent_config=(
{
"configurable": {
"thread_id": row["thread_id"],
"checkpoint_ns": row["checkpoint_ns"],
"checkpoint_id": row["parent_checkpoint_id"],
}
}
if row.get("parent_checkpoint_id")
else None
),
)
Step 7. In the squidfall directory, create a directory called model_providers.
mkdir model_providers
Step 8. In the model_providers directory you just created, create a file called __init__.py.
Step 9. In the model_providers directory, create a file called openai.py and add the content below to it.
# Standard library imports.
from os import environ
def get_openai_model():
# Third party imports.
from langchain_openai import ChatOpenAI
# Get environment variables.
OPENAI_MODEL = environ["OPENAI_MODEL"]
OPENAI_API_KEY = environ["OPENAI_API_KEY"]
return ChatOpenAI(model=OPENAI_MODEL, api_key=OPENAI_API_KEY)
def get_openai_model_from_azure():
# Third party imports.
from azure.identity import (
AzureAuthorityHosts,
DefaultAzureCredential,
get_bearer_token_provider,
)
from langchain_openai import AzureChatOpenAI
# Get environment variables.
AZURE_AUTHORITY_HOSTS = environ["AZURE_AUTHORITY_HOSTS"]
AZURE_TOKEN_SCOPES = environ["AZURE_TOKEN_SCOPES"]
AZURE_OPENAI_ENDPOINT = environ["AZURE_OPENAI_ENDPOINT"]
AZURE_OPENAI_DEPLOYMENT = environ["AZURE_OPENAI_DEPLOYMENT"]
AZURE_OPENAI_API_VERSION = environ["AZURE_OPENAI_API_VERSION"]
# Authenticate with Azure.
credential = DefaultAzureCredential(authority=AZURE_AUTHORITY_HOSTS)
# Get an authorization token provider.
token_provider = get_bearer_token_provider(
credential,
AZURE_TOKEN_SCOPES,
)
return AzureChatOpenAI(
azure_endpoint=AZURE_OPENAI_ENDPOINT,
azure_deployment=AZURE_OPENAI_DEPLOYMENT,
api_version=AZURE_OPENAI_API_VERSION,
azure_ad_token_provider=token_provider,
)
Step 10. In the squidfall directory, create a file called main.py and add the content below to it.
# Standard library imports.
from contextlib import asynccontextmanager
from os import environ
# Third party imports.
from ag_ui_langgraph import add_langgraph_fastapi_endpoint
from copilotkit import LangGraphAGUIAgent
from fastapi import FastAPI
from langchain_mcp_adapters.client import MultiServerMCPClient
from langchain.agents import create_agent
# Local imports.
from squidfall.checkpoint_saver import DjangoCheckpointSaver
from squidfall.model_providers.openai import (
get_openai_model,
get_openai_model_from_azure,
)
# Get environment variables.
MODEL_PROVIDER = environ["MODEL_PROVIDER"]
TOOLS_ENDPOINT = environ["TOOLS_ENDPOINT"]
# Get a model handler.
match MODEL_PROVIDER:
case "openai":
model = get_openai_model()
case "azure_openai":
model = get_openai_model_from_azure()
case _:
print("Invalid MODEL_PROVIDER (options: openai or azure_openai).")
exit(1)
# Identify the tools the agent has available.
async def get_tools():
return await MultiServerMCPClient(
{
"squidfall": {
"transport": "http",
"url": TOOLS_ENDPOINT,
}
}
).get_tools()
# Init a FastAPI server.
api = FastAPI()
@asynccontextmanager
async def lifespan(app: FastAPI):
tools = await get_tools()
agent = create_agent(
model=model,
system_prompt="You are a helpful assistant.",
checkpointer=DjangoCheckpointSaver(),
tools=tools,
)
add_langgraph_fastapi_endpoint(
app=app,
agent=LangGraphAGUIAgent(
name="squidfall",
description="An agent.",
graph=agent,
),
path="/api/v1",
)
yield
api.router.lifespan_context = lifespan
Step 11. In the inference directory, create a file called .env and add the content to below to it.
# App-specific variables.
export TOOLS_ENDPOINT="http://squidfall-tools:8002/mcp"
export BACKEND_ENDPOINT="http://squidfall-backend:8000"
If you're using the Azure Government offering of OpenAI's models, append the content below to the .env file.
# Model provider.
export MODEL_PROVIDER="azure_openai"
# Cloud-specific variables.
export AZURE_AUTHORITY_HOSTS="login.microsoftonline.us"
# Resource scope of the authorization token requested.
export AZURE_TOKEN_SCOPES="https://cognitiveservices.azure.us/.default"
# Tenant-specific variables.
export AZURE_TENANT_ID="xxx"
# Subscription-specific variables.
export AZURE_CLIENT_ID="xxx"
export AZURE_CLIENT_SECRET="xxx"
# Endpoint-specific variables.
export AZURE_OPENAI_ENDPOINT="https://<SERVICE>.openai.azure.us/"
export AZURE_OPENAI_DEPLOYMENT="squidfall"
export AZURE_OPENAI_API_VERSION="<YYYY-MM-DD>"
Otherwise, append the content below to the .env file.
# Model provider.
export MODEL_PROVIDER="openai"
# Endpoint-specific variables.
export OPENAI_MODEL="gpt-4o"
export OPENAI_API_KEY="xxx"
Step 12. In the inference directory, create a file called Dockerfile and add the content below it. Feel free to modify the image.authors label.
FROM alpine:3.23
LABEL image.authors="Victor Fernandez III, @cyberphor"
WORKDIR /home/inference/
COPY squidfall/ squidfall/
COPY requirements.txt requirements.txt
RUN apk add --no-cache --update python3 py3-pip uvicorn &&\
pip install --break-system-packages -r requirements.txt &&\
adduser -D squidfall -h /home/inference/ &&\
chown -R squidfall:squidfall /home/inference/
USER squidfall
EXPOSE 8001
CMD [ "uvicorn", "squidfall.main:api", "--host", "0.0.0.0", "--port", "8001" ]
Step 13. From the root of the repository, run the command below to start the inference container.
make DOCKER_COMPOSE_PROFILE=inference
Step 14. Run the command below to start the container you just created.
make DOCKER_COMPOSE_PROFILE=inference start
Step 15. Run the command below to confirm the container has started. If it hasn't (or failed), just re-run the previous command to restart it.
make DOCKER_COMPOSE_PROFILE=inference status
Step 16. If the container has started, run the command below to interact with it.
curl localhost:8001/api/v1/health && echo
You should get output similar to below.
{"status":"ok","agent":{"name":"squidfall"}}
Step 16. Leave all the containers (database, backend, tools, and inference) running.