pallet_permission0/ext/
stream_impl.rs

1use crate::{
2    AccumulatedStreamAmounts, BalanceOf, Config, DistributionControl, EnforcementTracking, Error,
3    Event, NegativeImbalanceOf, Pallet, PermissionContract, PermissionDuration, PermissionId,
4    PermissionScope, Permissions, StreamAllocation, StreamScope, generate_permission_id,
5    get_total_allocated_percentage, pallet,
6    permission::{stream::*, *},
7};
8
9use pallet_permission0_api::{
10    DistributionControl as ApiDistributionControl, EnforcementAuthority as ApiEnforcementAuthority,
11    Permission0StreamApi, PermissionDuration as ApiPermissionDuration,
12    RevocationTerms as ApiRevocationTerms, StreamAllocation as ApiStreamAllocation, StreamId,
13};
14use polkadot_sdk::{
15    frame_support::{dispatch::DispatchResult, ensure, traits::ReservableCurrency},
16    frame_system::{self, ensure_signed, ensure_signed_or_root},
17    polkadot_sdk_frame::prelude::{BlockNumberFor, OriginFor},
18    sp_core::{Get, TryCollect},
19    sp_runtime::{
20        BoundedBTreeMap, BoundedBTreeSet, DispatchError, Percent, Vec,
21        traits::{CheckedAdd, Saturating, Zero},
22    },
23};
24
25use pallet_torus0_api::Torus0Api;
26
27impl<T: Config>
28    Permission0StreamApi<
29        T::AccountId,
30        OriginFor<T>,
31        BlockNumberFor<T>,
32        crate::BalanceOf<T>,
33        NegativeImbalanceOf<T>,
34    > for pallet::Pallet<T>
35{
36    fn delegate_stream_permission(
37        delegator: T::AccountId,
38        recipients: Vec<(T::AccountId, u16)>,
39        allocation: ApiStreamAllocation<crate::BalanceOf<T>>,
40        distribution: ApiDistributionControl<crate::BalanceOf<T>, BlockNumberFor<T>>,
41        duration: ApiPermissionDuration<BlockNumberFor<T>>,
42        revocation: ApiRevocationTerms<T::AccountId, BlockNumberFor<T>>,
43        enforcement: ApiEnforcementAuthority<T::AccountId>,
44        recipient_manager: Option<T::AccountId>,
45        weight_setter: Option<T::AccountId>,
46    ) -> Result<PermissionId, DispatchError> {
47        let internal_allocation = match allocation {
48            ApiStreamAllocation::Streams(streams) => StreamAllocation::Streams(
49                streams
50                    .try_into()
51                    .map_err(|_| crate::Error::<T>::TooManyStreams)?,
52            ),
53            ApiStreamAllocation::FixedAmount(amount) => StreamAllocation::FixedAmount(amount),
54        };
55
56        let internal_distribution = match distribution {
57            ApiDistributionControl::Manual => DistributionControl::Manual,
58            ApiDistributionControl::Automatic(threshold) => {
59                DistributionControl::Automatic(threshold)
60            }
61            ApiDistributionControl::AtBlock(block) => DistributionControl::AtBlock(block),
62            ApiDistributionControl::Interval(interval) => DistributionControl::Interval(interval),
63        };
64
65        let duration = super::translate_duration(duration)?;
66        let revocation = super::translate_revocation_terms(revocation)?;
67        let enforcement = super::translate_enforcement_authority(enforcement)?;
68
69        let recipients = recipients
70            .into_iter()
71            .try_collect()
72            .map_err(|_| crate::Error::<T>::TooManyRecipients)?;
73
74        delegate_stream_permission_impl::<T>(
75            delegator,
76            recipients,
77            internal_allocation,
78            internal_distribution,
79            duration,
80            revocation,
81            enforcement,
82            recipient_manager,
83            weight_setter,
84        )
85    }
86
87    fn accumulate_streams(
88        agent: &T::AccountId,
89        stream: &StreamId,
90        amount: &mut NegativeImbalanceOf<T>,
91    ) {
92        crate::permission::stream::do_accumulate_streams::<T>(agent, stream, amount);
93    }
94
95    fn process_auto_distributions(current_block: BlockNumberFor<T>) {
96        crate::permission::do_auto_permission_execution::<T>(current_block);
97    }
98
99    fn get_accumulated_amount(
100        permission_id: &PermissionId,
101        stream: &StreamId,
102    ) -> crate::BalanceOf<T> {
103        let Some(contract) = Permissions::<T>::get(permission_id) else {
104            return Zero::zero();
105        };
106
107        crate::AccumulatedStreamAmounts::<T>::get((contract.delegator, stream, permission_id))
108            .unwrap_or_default()
109    }
110}
111
112/// Delegate a permission implementation
113#[allow(clippy::too_many_arguments)]
114pub(crate) fn delegate_stream_permission_impl<T: Config>(
115    delegator: T::AccountId,
116    recipients: BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>,
117    allocation: StreamAllocation<T>,
118    distribution: DistributionControl<T>,
119    duration: PermissionDuration<T>,
120    revocation: RevocationTerms<T>,
121    enforcement: EnforcementAuthority<T>,
122    recipient_manager: Option<T::AccountId>,
123    weight_setter: Option<T::AccountId>,
124) -> Result<PermissionId, DispatchError> {
125    use polkadot_sdk::frame_support::ensure;
126
127    ensure!(
128        T::Torus::is_agent_registered(&delegator),
129        Error::<T>::NotRegisteredAgent
130    );
131
132    validate_stream_permission_recipients::<T>(&delegator, &revocation, &recipients)?;
133
134    match &allocation {
135        StreamAllocation::Streams(streams) => {
136            validate_stream_permission_streams::<T>(streams, &delegator)?;
137        }
138        StreamAllocation::FixedAmount(amount) => {
139            ensure!(*amount > BalanceOf::<T>::zero(), Error::<T>::InvalidAmount);
140            ensure!(
141                T::Currency::can_reserve(&delegator, *amount),
142                Error::<T>::InsufficientBalance
143            );
144            ensure!(
145                matches!(
146                    &distribution,
147                    DistributionControl::Manual | DistributionControl::AtBlock(_)
148                ),
149                Error::<T>::FixedAmountCanOnlyBeTriggeredOnce
150            );
151        }
152    }
153
154    validate_stream_permission_distribution::<T>(&distribution)?;
155
156    let recipients_ids: Vec<_> = recipients.keys().cloned().collect();
157
158    let scope = PermissionScope::Stream(StreamScope {
159        recipients,
160        allocation: allocation.clone(),
161        distribution,
162        accumulating: true, // Start with accumulation enabled by default
163        recipient_managers: validate_stream_managers::<T>(&delegator, recipient_manager)?,
164        weight_setters: validate_stream_managers::<T>(&delegator, weight_setter)?,
165    });
166
167    let permission_id = generate_permission_id::<T>(&delegator, &scope)?;
168
169    let contract =
170        PermissionContract::<T>::new(delegator.clone(), scope, duration, revocation, enforcement);
171
172    Permissions::<T>::insert(permission_id, contract);
173
174    add_permission_indices::<T>(&delegator, recipients_ids.iter(), permission_id)?;
175
176    <Pallet<T>>::deposit_event(Event::PermissionDelegated {
177        delegator: delegator.clone(),
178        permission_id,
179    });
180
181    // Reserve funds if fixed amount allocation. We use the Balances API for this.
182    // This means total issuance is always correct.
183    match allocation {
184        StreamAllocation::FixedAmount(amount) => {
185            T::Currency::reserve(&delegator, amount)?;
186        }
187        StreamAllocation::Streams(streams) => {
188            for stream in streams.keys() {
189                AccumulatedStreamAmounts::<T>::set(
190                    (&delegator, stream, permission_id),
191                    Some(Zero::zero()),
192                )
193            }
194        }
195    }
196
197    Ok(permission_id)
198}
199
200pub fn execute_permission_impl<T: Config>(
201    permission_id: &PermissionId,
202    contract: &PermissionContract<T>,
203    stream_scope: &StreamScope<T>,
204) -> DispatchResult {
205    match &stream_scope.distribution {
206        DistributionControl::Manual => {
207            ensure!(
208                stream_scope.accumulating,
209                Error::<T>::UnsupportedPermissionType
210            );
211
212            let accumulated = match &stream_scope.allocation {
213                StreamAllocation::Streams(streams) => streams
214                    .keys()
215                    .filter_map(|id| {
216                        AccumulatedStreamAmounts::<T>::get((&contract.delegator, id, permission_id))
217                    })
218                    .fold(BalanceOf::<T>::zero(), |acc, e| acc.saturating_add(e)), // The Balance AST does not enforce the Sum trait
219                StreamAllocation::FixedAmount(amount) => *amount,
220            };
221
222            ensure!(!accumulated.is_zero(), Error::<T>::NoAccumulatedAmount);
223
224            crate::permission::stream::do_distribute_stream::<T>(
225                *permission_id,
226                contract,
227                DistributionReason::Manual,
228            )?;
229
230            Ok(())
231        }
232        _ => Err(Error::<T>::InvalidDistributionMethod.into()),
233    }
234}
235
236/// Toggle a permission's accumulation state
237pub fn toggle_permission_accumulation_impl<T: Config>(
238    origin: OriginFor<T>,
239    permission_id: PermissionId,
240    accumulating: bool,
241) -> DispatchResult {
242    let who = ensure_signed_or_root(origin)?;
243
244    let mut contract =
245        Permissions::<T>::get(permission_id).ok_or(Error::<T>::PermissionNotFound)?;
246
247    if let Some(who) = &who {
248        match &contract.enforcement {
249            _ if who == &contract.delegator => {}
250            EnforcementAuthority::None => {
251                return Err(Error::<T>::NotAuthorizedToToggle.into());
252            }
253            EnforcementAuthority::ControlledBy {
254                controllers,
255                required_votes,
256            } => {
257                ensure!(controllers.contains(who), Error::<T>::NotAuthorizedToToggle);
258
259                let referendum = EnforcementReferendum::EmissionAccumulation(accumulating);
260                let votes = EnforcementTracking::<T>::get(permission_id, &referendum)
261                    .into_iter()
262                    .filter(|id| id != who)
263                    .filter(|id| controllers.contains(id))
264                    .count();
265
266                if votes.saturating_add(1) < *required_votes as usize {
267                    return EnforcementTracking::<T>::mutate(
268                        permission_id,
269                        referendum.clone(),
270                        |votes| {
271                            votes
272                                .try_insert(who.clone())
273                                .map_err(|_| Error::<T>::TooManyControllers)?;
274
275                            <Pallet<T>>::deposit_event(Event::EnforcementVoteCast {
276                                permission_id,
277                                voter: who.clone(),
278                                referendum,
279                            });
280
281                            Ok(())
282                        },
283                    );
284                }
285            }
286        }
287    }
288
289    match &mut contract.scope {
290        PermissionScope::Stream(stream_scope) => stream_scope.accumulating = accumulating,
291        _ => return Err(Error::<T>::UnsupportedPermissionType.into()),
292    }
293
294    Permissions::<T>::insert(permission_id, contract);
295
296    // Clear any votes for this referendum
297    EnforcementTracking::<T>::remove(
298        permission_id,
299        EnforcementReferendum::EmissionAccumulation(accumulating),
300    );
301
302    <Pallet<T>>::deposit_event(Event::PermissionAccumulationToggled {
303        permission_id,
304        accumulating,
305        toggled_by: who,
306    });
307
308    Ok(())
309}
310
311pub(crate) fn update_stream_permission<T: Config>(
312    origin: OriginFor<T>,
313    permission_id: PermissionId,
314    new_recipients: Option<BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>>,
315    new_streams: Option<BoundedBTreeMap<StreamId, Percent, T::MaxStreamsPerPermission>>,
316    new_distribution_control: Option<DistributionControl<T>>,
317    new_recipient_manager: Option<Option<T::AccountId>>,
318    new_weight_setter: Option<Option<T::AccountId>>,
319) -> DispatchResult {
320    let caller = ensure_signed(origin)?;
321
322    let permission = Permissions::<T>::get(permission_id);
323
324    let Some(mut permission) = permission else {
325        return Err(Error::<T>::PermissionNotFound.into());
326    };
327
328    ensure!(permission.is_updatable(), Error::<T>::NotAuthorizedToEdit);
329
330    let PermissionScope::Stream(mut scope) = permission.scope.clone() else {
331        return Err(Error::<T>::NotEditable.into());
332    };
333
334    let allowed_delegator = permission.delegator == caller;
335    let allowed_weights = allowed_delegator || scope.weight_setters.contains(&caller);
336    let allowed_recipients = allowed_delegator || scope.recipient_managers.contains(&caller);
337
338    if !allowed_delegator && !allowed_weights && !allowed_recipients {
339        return Err(Error::<T>::NotAuthorizedToEdit.into());
340    }
341
342    if let Some(new_recipients) = new_recipients {
343        if !allowed_recipients
344            && (new_recipients.len() != scope.recipients.len()
345                || new_recipients
346                    .keys()
347                    .any(|k| !scope.recipients.contains_key(k)))
348        {
349            return Err(Error::<T>::NotAuthorizedToEdit.into());
350        }
351
352        validate_stream_permission_recipients::<T>(
353            &permission.delegator,
354            &permission.revocation,
355            &new_recipients,
356        )?;
357
358        // Remove old indices for current recipients
359        crate::permission::remove_permission_from_indices::<T>(
360            &permission.delegator,
361            scope.recipients.keys(),
362            permission_id,
363        );
364
365        // Update recipients
366        scope.recipients = new_recipients;
367
368        // Add new indices for updated recipients
369        crate::permission::add_permission_indices::<T>(
370            &permission.delegator,
371            scope.recipients.keys(),
372            permission_id,
373        )?;
374    }
375
376    if let Some(new_streams) = new_streams {
377        ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
378
379        let StreamAllocation::Streams(streams) = &mut scope.allocation else {
380            return Err(Error::<T>::NotEditable.into());
381        };
382
383        crate::permission::stream::do_distribute_stream::<T>(
384            permission_id,
385            &permission,
386            DistributionReason::Manual,
387        )?;
388
389        for stream in streams.keys() {
390            AccumulatedStreamAmounts::<T>::remove((&permission.delegator, stream, &permission_id));
391        }
392
393        validate_stream_permission_streams::<T>(&new_streams, &permission.delegator)?;
394
395        for stream in new_streams.keys() {
396            AccumulatedStreamAmounts::<T>::set(
397                (&permission.delegator, stream, permission_id),
398                Some(Zero::zero()),
399            )
400        }
401
402        *streams = new_streams;
403    }
404
405    if let Some(new_distribution_control) = new_distribution_control {
406        ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
407
408        validate_stream_permission_distribution::<T>(&new_distribution_control)?;
409        scope.distribution = new_distribution_control;
410    }
411
412    if let Some(new_recipient_manager) = new_recipient_manager {
413        ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
414        scope.recipient_managers =
415            validate_stream_managers::<T>(&permission.delegator, new_recipient_manager)?;
416    }
417
418    if let Some(new_weight_setter) = new_weight_setter {
419        ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
420        scope.weight_setters =
421            validate_stream_managers::<T>(&permission.delegator, new_weight_setter)?;
422    }
423
424    permission.scope = PermissionScope::Stream(scope);
425    permission.last_update = frame_system::Pallet::<T>::block_number();
426    Permissions::<T>::set(permission_id, Some(permission));
427
428    Ok(())
429}
430
431fn validate_stream_managers<T: Config>(
432    delegator: &T::AccountId,
433    entry: Option<T::AccountId>,
434) -> Result<BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>, DispatchError> {
435    let mut set = BoundedBTreeSet::new();
436    let _ = set.try_insert(delegator.clone());
437    if let Some(entry) = entry {
438        let _ = set.try_insert(entry);
439    }
440    Ok(set)
441}
442
443fn validate_stream_permission_recipients<T: Config>(
444    delegator: &T::AccountId,
445    revocation: &RevocationTerms<T>,
446    recipients: &BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>,
447) -> DispatchResult {
448    if !revocation.is_revokable() {
449        ensure!(!recipients.is_empty(), Error::<T>::NoRecipientsSpecified);
450    }
451
452    for (recipient, weight) in recipients {
453        ensure!(delegator != recipient, Error::<T>::InvalidRecipientWeight);
454        ensure!(*weight > 0, Error::<T>::InvalidRecipientWeight);
455        ensure!(
456            T::Torus::is_agent_registered(recipient),
457            Error::<T>::NotRegisteredAgent
458        );
459    }
460
461    Ok(())
462}
463
464fn validate_stream_permission_streams<T: Config>(
465    streams: &BoundedBTreeMap<StreamId, Percent, T::MaxStreamsPerPermission>,
466    delegator: &T::AccountId,
467) -> DispatchResult {
468    for (stream, percentage) in streams {
469        ensure!(*percentage <= Percent::one(), Error::<T>::InvalidPercentage);
470
471        let total_allocated = get_total_allocated_percentage::<T>(delegator, stream);
472        let new_total_allocated = match total_allocated.checked_add(percentage) {
473            Some(new_total_allocated) => new_total_allocated,
474            None => return Err(Error::<T>::TotalAllocationExceeded.into()),
475        };
476
477        ensure!(
478            new_total_allocated <= Percent::one(),
479            Error::<T>::TotalAllocationExceeded
480        );
481    }
482
483    Ok(())
484}
485
486fn validate_stream_permission_distribution<T: Config>(
487    distribution: &DistributionControl<T>,
488) -> DispatchResult {
489    match distribution {
490        DistributionControl::Automatic(threshold) => {
491            ensure!(
492                *threshold >= T::MinAutoDistributionThreshold::get(),
493                Error::<T>::InvalidThreshold
494            );
495        }
496        DistributionControl::Interval(interval) => {
497            ensure!(!interval.is_zero(), Error::<T>::InvalidInterval);
498        }
499        DistributionControl::AtBlock(block) => {
500            let current_block = <polkadot_sdk::frame_system::Pallet<T>>::block_number();
501            ensure!(*block > current_block, Error::<T>::InvalidInterval);
502        }
503        _ => {}
504    }
505
506    Ok(())
507}