1use crate::{
2 AccumulatedStreamAmounts, BalanceOf, Config, DistributionControl, EnforcementTracking, Error,
3 Event, NegativeImbalanceOf, Pallet, PermissionContract, PermissionDuration, PermissionId,
4 PermissionScope, Permissions, StreamAllocation, StreamScope, generate_permission_id,
5 get_total_allocated_percentage, pallet,
6 permission::{stream::*, *},
7};
8
9use pallet_permission0_api::{
10 DistributionControl as ApiDistributionControl, EnforcementAuthority as ApiEnforcementAuthority,
11 Permission0StreamApi, PermissionDuration as ApiPermissionDuration,
12 RevocationTerms as ApiRevocationTerms, StreamAllocation as ApiStreamAllocation, StreamId,
13};
14use polkadot_sdk::{
15 frame_support::{dispatch::DispatchResult, ensure, traits::ReservableCurrency},
16 frame_system::{self, ensure_signed, ensure_signed_or_root},
17 polkadot_sdk_frame::prelude::{BlockNumberFor, OriginFor},
18 sp_core::{Get, TryCollect},
19 sp_runtime::{
20 BoundedBTreeMap, BoundedBTreeSet, DispatchError, Percent, Vec,
21 traits::{CheckedAdd, Saturating, Zero},
22 },
23};
24
25use pallet_torus0_api::Torus0Api;
26
27impl<T: Config>
28 Permission0StreamApi<
29 T::AccountId,
30 OriginFor<T>,
31 BlockNumberFor<T>,
32 crate::BalanceOf<T>,
33 NegativeImbalanceOf<T>,
34 > for pallet::Pallet<T>
35{
36 fn delegate_stream_permission(
37 delegator: T::AccountId,
38 recipients: Vec<(T::AccountId, u16)>,
39 allocation: ApiStreamAllocation<crate::BalanceOf<T>>,
40 distribution: ApiDistributionControl<crate::BalanceOf<T>, BlockNumberFor<T>>,
41 duration: ApiPermissionDuration<BlockNumberFor<T>>,
42 revocation: ApiRevocationTerms<T::AccountId, BlockNumberFor<T>>,
43 enforcement: ApiEnforcementAuthority<T::AccountId>,
44 recipient_manager: Option<T::AccountId>,
45 weight_setter: Option<T::AccountId>,
46 ) -> Result<PermissionId, DispatchError> {
47 let internal_allocation = match allocation {
48 ApiStreamAllocation::Streams(streams) => StreamAllocation::Streams(
49 streams
50 .try_into()
51 .map_err(|_| crate::Error::<T>::TooManyStreams)?,
52 ),
53 ApiStreamAllocation::FixedAmount(amount) => StreamAllocation::FixedAmount(amount),
54 };
55
56 let internal_distribution = match distribution {
57 ApiDistributionControl::Manual => DistributionControl::Manual,
58 ApiDistributionControl::Automatic(threshold) => {
59 DistributionControl::Automatic(threshold)
60 }
61 ApiDistributionControl::AtBlock(block) => DistributionControl::AtBlock(block),
62 ApiDistributionControl::Interval(interval) => DistributionControl::Interval(interval),
63 };
64
65 let duration = super::translate_duration(duration)?;
66 let revocation = super::translate_revocation_terms(revocation)?;
67 let enforcement = super::translate_enforcement_authority(enforcement)?;
68
69 let recipients = recipients
70 .into_iter()
71 .try_collect()
72 .map_err(|_| crate::Error::<T>::TooManyRecipients)?;
73
74 delegate_stream_permission_impl::<T>(
75 delegator,
76 recipients,
77 internal_allocation,
78 internal_distribution,
79 duration,
80 revocation,
81 enforcement,
82 recipient_manager,
83 weight_setter,
84 )
85 }
86
87 fn accumulate_streams(
88 agent: &T::AccountId,
89 stream: &StreamId,
90 amount: &mut NegativeImbalanceOf<T>,
91 ) {
92 crate::permission::stream::do_accumulate_streams::<T>(agent, stream, amount);
93 }
94
95 fn process_auto_distributions(current_block: BlockNumberFor<T>) {
96 crate::permission::do_auto_permission_execution::<T>(current_block);
97 }
98
99 fn get_accumulated_amount(
100 permission_id: &PermissionId,
101 stream: &StreamId,
102 ) -> crate::BalanceOf<T> {
103 let Some(contract) = Permissions::<T>::get(permission_id) else {
104 return Zero::zero();
105 };
106
107 crate::AccumulatedStreamAmounts::<T>::get((contract.delegator, stream, permission_id))
108 .unwrap_or_default()
109 }
110}
111
112#[allow(clippy::too_many_arguments)]
114pub(crate) fn delegate_stream_permission_impl<T: Config>(
115 delegator: T::AccountId,
116 recipients: BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>,
117 allocation: StreamAllocation<T>,
118 distribution: DistributionControl<T>,
119 duration: PermissionDuration<T>,
120 revocation: RevocationTerms<T>,
121 enforcement: EnforcementAuthority<T>,
122 recipient_manager: Option<T::AccountId>,
123 weight_setter: Option<T::AccountId>,
124) -> Result<PermissionId, DispatchError> {
125 use polkadot_sdk::frame_support::ensure;
126
127 ensure!(
128 T::Torus::is_agent_registered(&delegator),
129 Error::<T>::NotRegisteredAgent
130 );
131
132 validate_stream_permission_recipients::<T>(&delegator, &revocation, &recipients)?;
133
134 match &allocation {
135 StreamAllocation::Streams(streams) => {
136 validate_stream_permission_streams::<T>(streams, &delegator)?;
137 }
138 StreamAllocation::FixedAmount(amount) => {
139 ensure!(*amount > BalanceOf::<T>::zero(), Error::<T>::InvalidAmount);
140 ensure!(
141 T::Currency::can_reserve(&delegator, *amount),
142 Error::<T>::InsufficientBalance
143 );
144 ensure!(
145 matches!(
146 &distribution,
147 DistributionControl::Manual | DistributionControl::AtBlock(_)
148 ),
149 Error::<T>::FixedAmountCanOnlyBeTriggeredOnce
150 );
151 }
152 }
153
154 validate_stream_permission_distribution::<T>(&distribution)?;
155
156 let recipients_ids: Vec<_> = recipients.keys().cloned().collect();
157
158 let scope = PermissionScope::Stream(StreamScope {
159 recipients,
160 allocation: allocation.clone(),
161 distribution,
162 accumulating: true, recipient_managers: validate_stream_managers::<T>(&delegator, recipient_manager)?,
164 weight_setters: validate_stream_managers::<T>(&delegator, weight_setter)?,
165 });
166
167 let permission_id = generate_permission_id::<T>(&delegator, &scope)?;
168
169 let contract =
170 PermissionContract::<T>::new(delegator.clone(), scope, duration, revocation, enforcement);
171
172 Permissions::<T>::insert(permission_id, contract);
173
174 add_permission_indices::<T>(&delegator, recipients_ids.iter(), permission_id)?;
175
176 <Pallet<T>>::deposit_event(Event::PermissionDelegated {
177 delegator: delegator.clone(),
178 permission_id,
179 });
180
181 match allocation {
184 StreamAllocation::FixedAmount(amount) => {
185 T::Currency::reserve(&delegator, amount)?;
186 }
187 StreamAllocation::Streams(streams) => {
188 for stream in streams.keys() {
189 AccumulatedStreamAmounts::<T>::set(
190 (&delegator, stream, permission_id),
191 Some(Zero::zero()),
192 )
193 }
194 }
195 }
196
197 Ok(permission_id)
198}
199
200pub fn execute_permission_impl<T: Config>(
201 permission_id: &PermissionId,
202 contract: &PermissionContract<T>,
203 stream_scope: &StreamScope<T>,
204) -> DispatchResult {
205 match &stream_scope.distribution {
206 DistributionControl::Manual => {
207 ensure!(
208 stream_scope.accumulating,
209 Error::<T>::UnsupportedPermissionType
210 );
211
212 let accumulated = match &stream_scope.allocation {
213 StreamAllocation::Streams(streams) => streams
214 .keys()
215 .filter_map(|id| {
216 AccumulatedStreamAmounts::<T>::get((&contract.delegator, id, permission_id))
217 })
218 .fold(BalanceOf::<T>::zero(), |acc, e| acc.saturating_add(e)), StreamAllocation::FixedAmount(amount) => *amount,
220 };
221
222 ensure!(!accumulated.is_zero(), Error::<T>::NoAccumulatedAmount);
223
224 crate::permission::stream::do_distribute_stream::<T>(
225 *permission_id,
226 contract,
227 DistributionReason::Manual,
228 )?;
229
230 Ok(())
231 }
232 _ => Err(Error::<T>::InvalidDistributionMethod.into()),
233 }
234}
235
236pub fn toggle_permission_accumulation_impl<T: Config>(
238 origin: OriginFor<T>,
239 permission_id: PermissionId,
240 accumulating: bool,
241) -> DispatchResult {
242 let who = ensure_signed_or_root(origin)?;
243
244 let mut contract =
245 Permissions::<T>::get(permission_id).ok_or(Error::<T>::PermissionNotFound)?;
246
247 if let Some(who) = &who {
248 match &contract.enforcement {
249 _ if who == &contract.delegator => {}
250 EnforcementAuthority::None => {
251 return Err(Error::<T>::NotAuthorizedToToggle.into());
252 }
253 EnforcementAuthority::ControlledBy {
254 controllers,
255 required_votes,
256 } => {
257 ensure!(controllers.contains(who), Error::<T>::NotAuthorizedToToggle);
258
259 let referendum = EnforcementReferendum::EmissionAccumulation(accumulating);
260 let votes = EnforcementTracking::<T>::get(permission_id, &referendum)
261 .into_iter()
262 .filter(|id| id != who)
263 .filter(|id| controllers.contains(id))
264 .count();
265
266 if votes.saturating_add(1) < *required_votes as usize {
267 return EnforcementTracking::<T>::mutate(
268 permission_id,
269 referendum.clone(),
270 |votes| {
271 votes
272 .try_insert(who.clone())
273 .map_err(|_| Error::<T>::TooManyControllers)?;
274
275 <Pallet<T>>::deposit_event(Event::EnforcementVoteCast {
276 permission_id,
277 voter: who.clone(),
278 referendum,
279 });
280
281 Ok(())
282 },
283 );
284 }
285 }
286 }
287 }
288
289 match &mut contract.scope {
290 PermissionScope::Stream(stream_scope) => stream_scope.accumulating = accumulating,
291 _ => return Err(Error::<T>::UnsupportedPermissionType.into()),
292 }
293
294 Permissions::<T>::insert(permission_id, contract);
295
296 EnforcementTracking::<T>::remove(
298 permission_id,
299 EnforcementReferendum::EmissionAccumulation(accumulating),
300 );
301
302 <Pallet<T>>::deposit_event(Event::PermissionAccumulationToggled {
303 permission_id,
304 accumulating,
305 toggled_by: who,
306 });
307
308 Ok(())
309}
310
311pub(crate) fn update_stream_permission<T: Config>(
312 origin: OriginFor<T>,
313 permission_id: PermissionId,
314 new_recipients: Option<BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>>,
315 new_streams: Option<BoundedBTreeMap<StreamId, Percent, T::MaxStreamsPerPermission>>,
316 new_distribution_control: Option<DistributionControl<T>>,
317 new_recipient_manager: Option<Option<T::AccountId>>,
318 new_weight_setter: Option<Option<T::AccountId>>,
319) -> DispatchResult {
320 let caller = ensure_signed(origin)?;
321
322 let permission = Permissions::<T>::get(permission_id);
323
324 let Some(mut permission) = permission else {
325 return Err(Error::<T>::PermissionNotFound.into());
326 };
327
328 ensure!(permission.is_updatable(), Error::<T>::NotAuthorizedToEdit);
329
330 let PermissionScope::Stream(mut scope) = permission.scope.clone() else {
331 return Err(Error::<T>::NotEditable.into());
332 };
333
334 let allowed_delegator = permission.delegator == caller;
335 let allowed_weights = allowed_delegator || scope.weight_setters.contains(&caller);
336 let allowed_recipients = allowed_delegator || scope.recipient_managers.contains(&caller);
337
338 if !allowed_delegator && !allowed_weights && !allowed_recipients {
339 return Err(Error::<T>::NotAuthorizedToEdit.into());
340 }
341
342 if let Some(new_recipients) = new_recipients {
343 if !allowed_recipients
344 && (new_recipients.len() != scope.recipients.len()
345 || new_recipients
346 .keys()
347 .any(|k| !scope.recipients.contains_key(k)))
348 {
349 return Err(Error::<T>::NotAuthorizedToEdit.into());
350 }
351
352 validate_stream_permission_recipients::<T>(
353 &permission.delegator,
354 &permission.revocation,
355 &new_recipients,
356 )?;
357
358 crate::permission::remove_permission_from_indices::<T>(
360 &permission.delegator,
361 scope.recipients.keys(),
362 permission_id,
363 );
364
365 scope.recipients = new_recipients;
367
368 crate::permission::add_permission_indices::<T>(
370 &permission.delegator,
371 scope.recipients.keys(),
372 permission_id,
373 )?;
374 }
375
376 if let Some(new_streams) = new_streams {
377 ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
378
379 let StreamAllocation::Streams(streams) = &mut scope.allocation else {
380 return Err(Error::<T>::NotEditable.into());
381 };
382
383 crate::permission::stream::do_distribute_stream::<T>(
384 permission_id,
385 &permission,
386 DistributionReason::Manual,
387 )?;
388
389 for stream in streams.keys() {
390 AccumulatedStreamAmounts::<T>::remove((&permission.delegator, stream, &permission_id));
391 }
392
393 validate_stream_permission_streams::<T>(&new_streams, &permission.delegator)?;
394
395 for stream in new_streams.keys() {
396 AccumulatedStreamAmounts::<T>::set(
397 (&permission.delegator, stream, permission_id),
398 Some(Zero::zero()),
399 )
400 }
401
402 *streams = new_streams;
403 }
404
405 if let Some(new_distribution_control) = new_distribution_control {
406 ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
407
408 validate_stream_permission_distribution::<T>(&new_distribution_control)?;
409 scope.distribution = new_distribution_control;
410 }
411
412 if let Some(new_recipient_manager) = new_recipient_manager {
413 ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
414 scope.recipient_managers =
415 validate_stream_managers::<T>(&permission.delegator, new_recipient_manager)?;
416 }
417
418 if let Some(new_weight_setter) = new_weight_setter {
419 ensure!(allowed_delegator, Error::<T>::NotAuthorizedToEdit);
420 scope.weight_setters =
421 validate_stream_managers::<T>(&permission.delegator, new_weight_setter)?;
422 }
423
424 permission.scope = PermissionScope::Stream(scope);
425 permission.last_update = frame_system::Pallet::<T>::block_number();
426 Permissions::<T>::set(permission_id, Some(permission));
427
428 Ok(())
429}
430
431fn validate_stream_managers<T: Config>(
432 delegator: &T::AccountId,
433 entry: Option<T::AccountId>,
434) -> Result<BoundedBTreeSet<T::AccountId, T::MaxControllersPerPermission>, DispatchError> {
435 let mut set = BoundedBTreeSet::new();
436 let _ = set.try_insert(delegator.clone());
437 if let Some(entry) = entry {
438 let _ = set.try_insert(entry);
439 }
440 Ok(set)
441}
442
443fn validate_stream_permission_recipients<T: Config>(
444 delegator: &T::AccountId,
445 revocation: &RevocationTerms<T>,
446 recipients: &BoundedBTreeMap<T::AccountId, u16, T::MaxRecipientsPerPermission>,
447) -> DispatchResult {
448 if !revocation.is_revokable() {
449 ensure!(!recipients.is_empty(), Error::<T>::NoRecipientsSpecified);
450 }
451
452 for (recipient, weight) in recipients {
453 ensure!(delegator != recipient, Error::<T>::InvalidRecipientWeight);
454 ensure!(*weight > 0, Error::<T>::InvalidRecipientWeight);
455 ensure!(
456 T::Torus::is_agent_registered(recipient),
457 Error::<T>::NotRegisteredAgent
458 );
459 }
460
461 Ok(())
462}
463
464fn validate_stream_permission_streams<T: Config>(
465 streams: &BoundedBTreeMap<StreamId, Percent, T::MaxStreamsPerPermission>,
466 delegator: &T::AccountId,
467) -> DispatchResult {
468 for (stream, percentage) in streams {
469 ensure!(*percentage <= Percent::one(), Error::<T>::InvalidPercentage);
470
471 let total_allocated = get_total_allocated_percentage::<T>(delegator, stream);
472 let new_total_allocated = match total_allocated.checked_add(percentage) {
473 Some(new_total_allocated) => new_total_allocated,
474 None => return Err(Error::<T>::TotalAllocationExceeded.into()),
475 };
476
477 ensure!(
478 new_total_allocated <= Percent::one(),
479 Error::<T>::TotalAllocationExceeded
480 );
481 }
482
483 Ok(())
484}
485
486fn validate_stream_permission_distribution<T: Config>(
487 distribution: &DistributionControl<T>,
488) -> DispatchResult {
489 match distribution {
490 DistributionControl::Automatic(threshold) => {
491 ensure!(
492 *threshold >= T::MinAutoDistributionThreshold::get(),
493 Error::<T>::InvalidThreshold
494 );
495 }
496 DistributionControl::Interval(interval) => {
497 ensure!(!interval.is_zero(), Error::<T>::InvalidInterval);
498 }
499 DistributionControl::AtBlock(block) => {
500 let current_block = <polkadot_sdk::frame_system::Pallet<T>>::block_number();
501 ensure!(*block > current_block, Error::<T>::InvalidInterval);
502 }
503 _ => {}
504 }
505
506 Ok(())
507}