📚Academy
likeone
online

Build AI Workflow

Assemble a workflow from components: trigger, AI classify, filter, transform, and action. Then simulate data flowing through it.

Workflow Design Principles

Before building any workflow, you need to understand the design principles that separate reliable production systems from fragile scripts. These principles apply whether you are using Make.com, n8n, Zapier, or writing custom code.

Single Responsibility

Each step in your workflow should do exactly one thing. A step that classifies AND routes AND sends a notification is doing three things. Break it into three steps. When one fails, you know exactly where the problem is, and you can retry just that step without re-running the whole pipeline.

Data Contracts Between Steps

Define what data each step expects as input and what it produces as output. When Step 2 expects a customer_email field from Step 1, that is a contract. If Step 1 changes its output format, Step 2 breaks. Document these contracts and validate inputs at each step.

Fail Gracefully

Every step must handle failure. If the AI classifier returns an error, the workflow should not crash — it should route to a fallback (human review). If the email sender fails, save the message for retry. The question is never "will this fail?" but "what happens when it fails?"

Observability

Log every step's input, output, and execution time. You need to answer: "What happened to ticket #4521?" at any time. Without logging, you are debugging blind. Add a unique request ID that flows through every step so you can trace a single request across the entire pipeline.

Common Workflow Patterns

There are three fundamental patterns for organizing workflow steps. Most real workflows combine these patterns.

Sequential (Pipeline)

Steps run one after another. Step 2 waits for Step 1 to finish. Each step's output feeds into the next step's input. This is the simplest pattern — a straight line from trigger to final action.

Example: Receive email → Extract intent → Create ticket → Notify team
Parallel (Fan-out)

Multiple steps run simultaneously. Use this when steps are independent of each other — sending an email while also updating a database while also logging to analytics. Parallel execution reduces total workflow time.

Example: New order → [Send confirmation email | Update inventory | Notify warehouse] (all at once)
Conditional (Router)

Data is routed to different branches based on conditions. An IF/ELSE gate that sends data down different paths. Common in AI workflows where the classification result determines the next action.

Example: Classify ticket → IF billing issue → Finance team | IF technical → Engineering | IF feedback → Product team

Workflow Example: n8n-Style Automation

Here is how you would build a simple webhook-to-Slack automation using n8n concepts in code. This demonstrates the sequential pattern with error handling — the same logic used by visual automation platforms like Make.com and n8n.

Python — Simple webhook-to-Slack workflow with retry logic
import httpx, time, json

class WorkflowStep:
    """Base class for all workflow steps."""
    def execute(self, data: dict) -> dict:
        raise NotImplementedError

class ValidatePayload(WorkflowStep):
    """Step 1: Validate incoming webhook data."""
    def execute(self, data: dict) -> dict:
        required = ["event", "user", "message"]
        missing = [f for f in required if f not in data]
        if missing:
            raise ValueError(f"Missing fields: {missing}")
        return data

class SendSlackMessage(WorkflowStep):
    """Step 2: Post to Slack with retry logic."""
    def execute(self, data: dict) -> dict:
        for attempt in range(3):
            try:
                resp = httpx.post(
                    "https://hooks.slack.com/services/YOUR/WEBHOOK",
                    json={"text": f"{data['user']}: {data['message']}"},
                    timeout=10.0
                )
                resp.raise_for_status()
                return {**data, "slack_sent": True}
            except httpx.HTTPError:
                time.sleep(2 ** attempt)
        return {**data, "slack_sent": False, "error": "All retries failed"}

def run_pipeline(payload: dict):
    """Execute steps sequentially."""
    steps = [ValidatePayload(), SendSlackMessage()]
    data = payload
    for step in steps:
        print(f"Running: {step.__class__.__name__}")
        data = step.execute(data)
    return data
🔒

This lesson is for Pro members

Unlock all 520+ lessons across 52 courses with Academy Pro.

Already a member? Sign in to access your lessons.

Academy
Built with soul — likeone.ai