openzeppelin_relayer/jobs/handlers/
notification_handler.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
//! Notification handling worker implementation.
//!
//! This module implements the notification handling worker that processes
//! notification jobs from the queue.

use actix_web::web::ThinData;
use apalis::prelude::{Attempt, Data, *};
use eyre::Result;
use log::info;

use crate::{
    constants::WORKER_DEFAULT_MAXIMUM_RETRIES,
    jobs::{handle_result, Job, JobProducer, NotificationSend},
    models::AppState,
    repositories::Repository,
    services::WebhookNotificationService,
};

/// Handles incoming notification jobs from the queue.
///
/// # Arguments
/// * `job` - The notification job containing recipient and message details
/// * `context` - Application state containing notification services
///
/// # Returns
/// * `Result<(), Error>` - Success or failure of notification processing
pub async fn notification_handler(
    job: Job<NotificationSend>,
    context: Data<ThinData<AppState<JobProducer>>>,
    attempt: Attempt,
) -> Result<(), Error> {
    info!("handling notification: {:?}", job.data);

    let result = handle_request(job.data, context).await;

    handle_result(
        result,
        attempt,
        "Notification",
        WORKER_DEFAULT_MAXIMUM_RETRIES,
    )
}

async fn handle_request(
    request: NotificationSend,
    context: Data<ThinData<AppState<JobProducer>>>,
) -> Result<()> {
    info!("sending notification: {:?}", request);
    let notification = context
        .notification_repository
        .get_by_id(request.notification_id)
        .await?;

    let notification_service =
        WebhookNotificationService::new(notification.url, notification.signing_key);

    notification_service
        .send_notification(request.notification)
        .await?;

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::models::{
        EvmPolicyResponse, EvmTransactionResponse, NetworkPolicyResponse, NetworkType,
        RelayerDisabledPayload, RelayerResponse, TransactionResponse, TransactionStatus,
        WebhookNotification, WebhookPayload, U256,
    };

    #[tokio::test]
    async fn test_notification_job_creation() {
        // Create a basic notification webhook payload
        let payload =
            WebhookPayload::Transaction(TransactionResponse::Evm(EvmTransactionResponse {
                id: "tx123".to_string(),
                hash: Some("0x123".to_string()),
                status: TransactionStatus::Confirmed,
                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
                gas_price: Some(1000000000),
                gas_limit: 21000,
                nonce: Some(1),
                value: U256::from(1000000000000000000_u64),
                from: "0xabc".to_string(),
                to: Some("0xdef".to_string()),
                relayer_id: "relayer-1".to_string(),
            }));

        // Create a notification
        let notification = WebhookNotification::new("test_event".to_string(), payload);
        let notification_job =
            NotificationSend::new("notification-1".to_string(), notification.clone());

        // Create the job
        let job = Job::new(crate::jobs::JobType::NotificationSend, notification_job);

        // Test the job structure
        assert_eq!(job.data.notification_id, "notification-1");
        assert_eq!(job.data.notification.event, "test_event");
    }

    #[tokio::test]
    async fn test_notification_job_with_different_payloads() {
        // Test with different payload types

        let transaction_payload =
            WebhookPayload::Transaction(TransactionResponse::Evm(EvmTransactionResponse {
                id: "tx123".to_string(),
                hash: Some("0x123".to_string()),
                status: TransactionStatus::Confirmed,
                created_at: "2025-01-27T15:31:10.777083+00:00".to_string(),
                sent_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
                confirmed_at: Some("2025-01-27T15:31:10.777083+00:00".to_string()),
                gas_price: Some(1000000000),
                gas_limit: 21000,
                nonce: Some(1),
                value: U256::from(1000000000000000000_u64),
                from: "0xabc".to_string(),
                to: Some("0xdef".to_string()),
                relayer_id: "relayer-1".to_string(),
            }));

        let string_notification =
            WebhookNotification::new("transaction_payload".to_string(), transaction_payload);
        let job = NotificationSend::new("notification-string".to_string(), string_notification);
        assert_eq!(job.notification.event, "transaction_payload");

        let relayer_disabled = WebhookPayload::RelayerDisabled(RelayerDisabledPayload {
            relayer: RelayerResponse {
                id: "relayer-1".to_string(),
                name: "relayer-1".to_string(),
                network: "ethereum".to_string(),
                network_type: NetworkType::Evm,
                paused: false,
                policies: NetworkPolicyResponse::Evm(EvmPolicyResponse {
                    gas_price_cap: None,
                    whitelist_receivers: None,
                    eip1559_pricing: None,
                    private_transactions: false,
                    min_balance: 0,
                }),
                address: "0xabc".to_string(),
                system_disabled: false,
            },
            disable_reason: "test".to_string(),
        });
        let object_notification =
            WebhookNotification::new("object_event".to_string(), relayer_disabled);
        let job = NotificationSend::new("notification-object".to_string(), object_notification);
        assert_eq!(job.notification.event, "object_event");
    }
}