pallet_permission0/permission/
stream.rs

1use polkadot_sdk::{
2    frame_support::traits::{
3        Currency, ExistenceRequirement, Imbalance, ReservableCurrency, WithdrawReasons,
4    },
5    frame_system,
6    sp_arithmetic::FixedU128,
7    sp_runtime::traits::{Saturating, Zero},
8};
9
10use super::*;
11
12/// Type for stream ID
13pub type StreamId = H256;
14
15/// Stream-specific permission scope
16#[derive(Encode, Decode, CloneNoBound, PartialEq, TypeInfo, MaxEncodedLen, DebugNoBound)]
17#[scale_info(skip_type_params(T))]
18pub struct StreamScope<T: Config> {
19    /// Recipients of the strams and its weights
20    pub recipients: BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>,
21    /// What portion of streams this permission applies to
22    pub allocation: StreamAllocation<T>,
23    /// Distribution control parameters
24    pub distribution: DistributionControl<T>,
25    /// Whether strams should accumulate (can be toggled by enforcement authority)
26    pub accumulating: bool,
27    /// An account responsible for managing the recipients to this permission's streams.
28    /// If left empty, the delegator will be
29    pub recipient_managers: BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>,
30    /// An account responsible for updating the weights of existing recipients. Useful
31    /// for third-party agents to manage how the streams will be distributed.
32    pub weight_setters: BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>,
33}
34
35impl<T: Config> StreamScope<T> {
36    pub(super) fn cleanup(
37        self,
38        permission_id: H256,
39        last_executed: &Option<BlockNumberFor<T>>,
40        delegator: &T::AccountId,
41    ) {
42        match self.allocation {
43            StreamAllocation::Streams(streams) => {
44                for stream in streams.keys() {
45                    AccumulatedStreamAmounts::<T>::remove((delegator, stream, &permission_id));
46                }
47            }
48            StreamAllocation::FixedAmount(amount) if last_executed.is_none() => {
49                T::Currency::unreserve(delegator, amount);
50            }
51            _ => {}
52        }
53    }
54}
55
56/// Defines what portion of streams the permission applies to
57#[derive(Encode, Decode, CloneNoBound, PartialEqNoBound, TypeInfo, MaxEncodedLen, DebugNoBound)]
58#[scale_info(skip_type_params(T))]
59pub enum StreamAllocation<T: Config> {
60    /// Permission applies to a percentage of each stream
61    Streams(BoundedBTreeMap<StreamId, Percent, T::MaxStreamsPerPermission>),
62    /// Permission applies to a specific fixed amount
63    FixedAmount(BalanceOf<T>),
64}
65
66#[derive(Encode, Decode, CloneNoBound, PartialEq, TypeInfo, MaxEncodedLen, DebugNoBound)]
67#[scale_info(skip_type_params(T))]
68pub enum DistributionControl<T: Config> {
69    /// Manual distribution by the delegator
70    Manual,
71    /// Automatic distribution after accumulation threshold
72    Automatic(BalanceOf<T>),
73    /// Distribution at specific block
74    AtBlock(BlockNumberFor<T>),
75    /// Distribution at fixed intervals
76    Interval(BlockNumberFor<T>),
77}
78
79/// Accumulate emissions for a specific agent, distributes if control is met.
80pub(crate) fn do_accumulate_streams<T: Config>(
81    agent: &T::AccountId,
82    stream_id: &StreamId,
83    imbalance: &mut NegativeImbalanceOf<T>,
84) {
85    let initial_balance = imbalance.peek();
86    let total_initial_amount =
87        FixedU128::from_inner(initial_balance.try_into().unwrap_or_default());
88    if total_initial_amount.is_zero() {
89        return;
90    }
91
92    let streams = AccumulatedStreamAmounts::<T>::iter_prefix((agent, stream_id));
93    for (permission_id, accumulated) in streams {
94        let Some(contract) = Permissions::<T>::get(permission_id) else {
95            continue;
96        };
97
98        // Only process stream permissions with percentage allocations,
99        // fixed-amount emission reserves balance upfront on permission creation
100        let PermissionScope::Stream(StreamScope {
101            allocation: StreamAllocation::Streams(streams),
102            accumulating,
103            ..
104        }) = contract.scope
105        else {
106            continue;
107        };
108
109        if !accumulating {
110            continue;
111        }
112
113        let Some(percentage) = streams.get(stream_id) else {
114            continue;
115        };
116
117        let delegated_amount = percentage.mul_floor(total_initial_amount.into_inner());
118        if delegated_amount.is_zero() {
119            continue;
120        }
121
122        let delegated_amount = imbalance
123            .extract(delegated_amount.try_into().unwrap_or_default())
124            .peek();
125
126        AccumulatedStreamAmounts::<T>::set(
127            (agent, stream_id, &permission_id),
128            Some(accumulated.saturating_add(delegated_amount)),
129        );
130
131        Pallet::<T>::deposit_event(Event::AccumulatedEmission {
132            permission_id,
133            stream_id: *stream_id,
134            amount: delegated_amount,
135        });
136    }
137}
138
139pub(crate) fn do_auto_distribution<T: Config>(
140    stream_scope: &StreamScope<T>,
141    permission_id: H256,
142    current_block: BlockNumberFor<T>,
143    contract: &PermissionContract<T>,
144) -> DispatchResult {
145    match stream_scope.distribution {
146        DistributionControl::Automatic(threshold) => {
147            let accumulated = match &stream_scope.allocation {
148                StreamAllocation::Streams(streams) => streams
149                    .keys()
150                    .filter_map(|id| {
151                        AccumulatedStreamAmounts::<T>::get((&contract.delegator, id, permission_id))
152                    })
153                    .fold(BalanceOf::<T>::zero(), |acc, e| acc.saturating_add(e)), // The Balance AST does not enforce the Sum trait
154                StreamAllocation::FixedAmount(amount) => *amount,
155            };
156
157            if accumulated >= threshold {
158                do_distribute_stream::<T>(permission_id, contract, DistributionReason::Automatic)?;
159            }
160        }
161
162        DistributionControl::AtBlock(target_block) if current_block > target_block => {
163            // As we only verify once every 10 blocks, we have to check if current block
164            // is GTE to the target block. To avoid, triggering on every block,
165            // we also verify that the last execution occurred before the target block
166            // (or haven't occurred at all)
167            if contract
168                .last_execution()
169                .is_some_and(|last_execution| last_execution >= target_block)
170            {
171                return Ok(());
172            }
173
174            do_distribute_stream::<T>(permission_id, contract, DistributionReason::Automatic)?;
175        }
176
177        DistributionControl::Interval(interval) => {
178            let last_execution = contract.last_execution.unwrap_or(contract.created_at);
179            if current_block.saturating_sub(last_execution) < interval {
180                return Ok(());
181            }
182
183            do_distribute_stream::<T>(permission_id, contract, DistributionReason::Automatic)?;
184        }
185
186        // Manual distribution doesn't need auto-processing
187        _ => {}
188    }
189
190    Ok(())
191}
192
193#[derive(
194    Encode, Decode, Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, TypeInfo, MaxEncodedLen,
195)]
196pub enum DistributionReason {
197    Automatic,
198    Manual,
199}
200
201/// Distribute accumulated emissions for a permission
202pub(crate) fn do_distribute_stream<T: Config>(
203    permission_id: PermissionId,
204    contract: &PermissionContract<T>,
205    reason: DistributionReason,
206) -> DispatchResult {
207    let PermissionScope::Stream(stream_scope) = &contract.scope else {
208        return Ok(());
209    };
210
211    let total_weight =
212        FixedU128::from_u32(stream_scope.recipients.values().map(|w| *w as u32).sum());
213    if total_weight.is_zero() {
214        trace!("permission {permission_id:?} does not have enough target weight");
215        return Ok(());
216    }
217
218    match &stream_scope.allocation {
219        StreamAllocation::Streams(streams) => {
220            let streams = streams.keys().filter_map(|stream_id| {
221                let acc = AccumulatedStreamAmounts::<T>::get((
222                    &contract.delegator,
223                    stream_id,
224                    permission_id,
225                ))?;
226
227                // You cannot remove the stream from the storage as
228                // it's needed in the accumulation code, avoid using `take`
229                AccumulatedStreamAmounts::<T>::set(
230                    (&contract.delegator, stream_id, permission_id),
231                    Some(Zero::zero()),
232                );
233
234                if acc.is_zero() {
235                    None
236                } else {
237                    // For percentage allocations, mint new tokens
238                    // This is safe because we're only distributing a percentage of
239                    // tokens that were already allocated to emission rewards
240                    Some((stream_id, T::Currency::issue(acc)))
241                }
242            });
243
244            for (stream, mut imbalance) in streams {
245                do_distribute_to_targets(
246                    &mut imbalance,
247                    permission_id,
248                    stream_scope,
249                    Some(stream),
250                    total_weight,
251                    reason,
252                );
253
254                let remainder = imbalance.peek();
255                if !remainder.is_zero() {
256                    AccumulatedStreamAmounts::<T>::mutate(
257                        (&contract.delegator, stream, permission_id),
258                        |acc| *acc = Some(acc.unwrap_or_default().saturating_add(remainder)),
259                    );
260                }
261            }
262        }
263        StreamAllocation::FixedAmount(amount) => {
264            if contract.last_execution().is_some() {
265                // The fixed amount was already distributed
266                return Ok(());
267            }
268
269            // For fixed amount allocations, transfer from reserved funds
270            let _ = T::Currency::unreserve(&contract.delegator, *amount);
271            let mut imbalance = T::Currency::withdraw(
272                &contract.delegator,
273                *amount,
274                WithdrawReasons::TRANSFER,
275                ExistenceRequirement::KeepAlive,
276            )
277            .unwrap_or_else(|_| NegativeImbalanceOf::<T>::zero());
278
279            do_distribute_to_targets(
280                &mut imbalance,
281                permission_id,
282                stream_scope,
283                None,
284                total_weight,
285                reason,
286            );
287        }
288    }
289
290    if let Some(mut contract) = Permissions::<T>::get(permission_id) {
291        contract.tick_execution(<frame_system::Pallet<T>>::block_number())?;
292        Permissions::<T>::set(permission_id, Some(contract));
293    }
294
295    Ok(())
296}
297
298fn do_distribute_to_targets<T: Config>(
299    imbalance: &mut NegativeImbalanceOf<T>,
300    permission_id: PermissionId,
301    stream_scope: &StreamScope<T>,
302    stream: Option<&StreamId>,
303    total_weight: FixedU128,
304    reason: DistributionReason,
305) {
306    let initial_balance = imbalance.peek();
307    let total_initial_amount =
308        FixedU128::from_inner(initial_balance.try_into().unwrap_or_default());
309    if total_initial_amount.is_zero() {
310        trace!("no amount to distribute for permission {permission_id:?} and stream {stream:?}");
311        return;
312    }
313
314    for (target, weight) in stream_scope.recipients.iter() {
315        let target_weight = FixedU128::from_u32(*weight as u32);
316        let target_amount = total_initial_amount
317            .saturating_mul(target_weight)
318            .const_checked_div(total_weight)
319            .unwrap_or_default();
320
321        if target_amount.is_zero() {
322            continue;
323        }
324
325        let target_amount =
326            BalanceOf::<T>::try_from(target_amount.into_inner()).unwrap_or_default();
327        let mut imbalance = imbalance.extract(target_amount);
328
329        if let Some(stream) = stream {
330            // Process recursive accumulation here, only deposit what remains
331            do_accumulate_streams::<T>(target, stream, &mut imbalance);
332        }
333
334        T::Currency::resolve_creating(target, imbalance);
335
336        Pallet::<T>::deposit_event(Event::StreamDistribution {
337            permission_id,
338            stream_id: stream.cloned(),
339            recipient: target.clone(),
340            amount: target_amount,
341            reason,
342        });
343    }
344}